diff options
author | clarkzjw <[email protected]> | 2023-02-20 23:56:58 -0800 |
---|---|---|
committer | clarkzjw <[email protected]> | 2023-02-20 23:56:58 -0800 |
commit | 25c5863df871629c74b3f8d678a0a8c9e786e227 (patch) | |
tree | fcfcd40df3ce572a585401a7fd238271d4032d33 | |
parent | 33648c2d0b827278b3525ee5ddb480eaa15b15cb (diff) | |
download | swarm2fediverse-25c5863df871629c74b3f8d678a0a8c9e786e227.tar.gz |
allow users to upload photos
-rw-r--r-- | bot.py | 226 |
1 files changed, 196 insertions, 30 deletions
@@ -2,12 +2,8 @@ | |||
2 | # pylint: disable=unused-argument, wrong-import-position | 2 | # pylint: disable=unused-argument, wrong-import-position |
3 | # This program is dedicated to the public domain under the CC0 license. | 3 | # This program is dedicated to the public domain under the CC0 license. |
4 | 4 | ||
5 | """ | ||
6 | Basic example for a bot that uses inline keyboards. For an in-depth explanation, check out | ||
7 | https://github.com/python-telegram-bot/python-telegram-bot/wiki/InlineKeyboard-Example. | ||
8 | """ | ||
9 | import logging | 5 | import logging |
10 | 6 | import io | |
11 | import telegram.constants | 7 | import telegram.constants |
12 | from telegram import __version__ as TG_VER | 8 | from telegram import __version__ as TG_VER |
13 | 9 | ||
@@ -22,14 +18,20 @@ if __version_info__ < (20, 0, 0, "alpha", 1): | |||
22 | f"{TG_VER} version of this example, " | 18 | f"{TG_VER} version of this example, " |
23 | f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" | 19 | f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" |
24 | ) | 20 | ) |
25 | from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update | 21 | from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardRemove |
26 | from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters | 22 | from telegram.ext import Application, CallbackQueryHandler, \ |
27 | 23 | CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext, JobQueue | |
28 | from config import BOT_TOKEN | 24 | from config import BOT_TOKEN |
29 | from foursquare.poi import query_poi, get_poi_top_photo | 25 | from foursquare.poi import query_poi |
30 | from dbstore.dbm_store import get_loc | 26 | from dbstore.dbm_store import get_loc |
31 | from toot import mastodon_client | 27 | from toot import mastodon_client |
32 | import urllib.request | 28 | from typing import TypedDict, List, cast |
29 | from telegram import Update, InputMediaVideo, InputMediaPhoto | ||
30 | |||
31 | scheduler = None | ||
32 | PRIVACY, TOOT = map(chr, range(8, 10)) | ||
33 | |||
34 | WAIT_LOC, LOCATION, PHOTO, PROCESS_PHOTO, FINAL = range(5) | ||
33 | 35 | ||
34 | # Enable logging | 36 | # Enable logging |
35 | logging.basicConfig( | 37 | logging.basicConfig( |
@@ -47,6 +49,12 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |||
47 | "Start using this bot by sharing your location using Telegram context menu to it." | 49 | "Start using this bot by sharing your location using Telegram context menu to it." |
48 | 50 | ||
49 | await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) | 51 | await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) |
52 | await update.message.reply_text("Please choose", | ||
53 | reply_markup=telegram.ReplyKeyboardMarkup([ | ||
54 | [telegram.KeyboardButton(text="Check in", request_location=True)], | ||
55 | [telegram.KeyboardButton(text="Setting")]])) | ||
56 | |||
57 | return LOCATION | ||
50 | 58 | ||
51 | 59 | ||
52 | async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | 60 | async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
@@ -60,31 +68,85 @@ async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |||
60 | reply_markup = InlineKeyboardMarkup(keyboard) | 68 | reply_markup = InlineKeyboardMarkup(keyboard) |
61 | await update.message.reply_text("Select a place", reply_markup=reply_markup) | 69 | await update.message.reply_text("Select a place", reply_markup=reply_markup) |
62 | 70 | ||
71 | return WAIT_LOC | ||
63 | 72 | ||
64 | async def button(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
65 | """Parses the CallbackQuery and updates the message text.""" | ||
66 | query = update.callback_query | ||
67 | 73 | ||
74 | async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
75 | query = update.callback_query | ||
68 | await query.answer() | 76 | await query.answer() |
69 | # TODO | 77 | print(query.data) |
70 | # ask user whether they would like to attach their own photos | 78 | context.user_data["fsq_id"] = query.data |
71 | 79 | await query.delete_message() | |
72 | poi = get_loc(query.data) | ||
73 | photo_url = get_poi_top_photo(query.data) | ||
74 | media_id = None | ||
75 | if photo_url is not None: | ||
76 | with urllib.request.urlopen(photo_url) as response: | ||
77 | data = response.read() | ||
78 | media = mastodon_client.media_post(data, mime_type="image/jpeg") | ||
79 | media_id = [media["id"]] | ||
80 | 80 | ||
81 | poi = get_loc(context.user_data["fsq_id"]) | ||
82 | media_id = [] | ||
83 | content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" | ||
81 | status = mastodon_client.status_post( | 84 | status = mastodon_client.status_post( |
85 | content, | ||
86 | visibility="private", | ||
87 | media_ids=media_id) | ||
88 | |||
89 | context.user_data["status_id"] = status["id"] | ||
90 | context.user_data["status_content"] = content | ||
91 | |||
92 | print("status_id", context.user_data["status_id"]) | ||
93 | |||
94 | await query.message.reply_text( | ||
95 | text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", | ||
96 | parse_mode=telegram.constants.ParseMode.MARKDOWN, | ||
97 | reply_markup=telegram.ReplyKeyboardMarkup([ | ||
98 | [telegram.KeyboardButton(text="Check in", request_location=True)], | ||
99 | [telegram.KeyboardButton(text="Setting")]]) | ||
100 | ) | ||
101 | |||
102 | await query.message.reply_text("You can continue attaching photos, or press skip to continue", | ||
103 | reply_markup=telegram.ReplyKeyboardMarkup([ | ||
104 | [telegram.KeyboardButton(text="/skip")]])) | ||
105 | return PHOTO | ||
106 | |||
107 | |||
108 | async def action(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
109 | if update.message.text == "Check in": | ||
110 | await update.message.reply_text("Please share your location", | ||
111 | reply_markup=telegram.ReplyKeyboardRemove()) | ||
112 | elif update.message.text == "Setting": | ||
113 | await update.message.reply_text("Setting") | ||
114 | |||
115 | |||
116 | async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
117 | keyboard = [[ | ||
118 | InlineKeyboardButton("Privacy", callback_data=PRIVACY), | ||
119 | ]] | ||
120 | |||
121 | reply_markup = InlineKeyboardMarkup(keyboard) | ||
122 | await update.message.reply_text("Setting", reply_markup=reply_markup) | ||
123 | |||
124 | |||
125 | async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
126 | await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) | ||
127 | |||
128 | fsq_id = context.user_data["fsq_id"] | ||
129 | poi = get_loc(context.user_data["fsq_id"]) | ||
130 | media_id = [] | ||
131 | |||
132 | if context.user_data.get("photo") is not None: | ||
133 | media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg") | ||
134 | media_id = [media["id"]] | ||
135 | # else: | ||
136 | # photo_url = get_poi_top_photo(context.user_data["fsq_id"]) | ||
137 | # if photo_url is not None: | ||
138 | # with urllib.request.urlopen(photo_url) as response: | ||
139 | # data = response.read() | ||
140 | # media = mastodon_client.media_post(data, mime_type="image/jpeg") | ||
141 | # media_id = [media["id"]] | ||
142 | |||
143 | mastodon_client.status_post( | ||
82 | f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", | 144 | f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", |
83 | visibility="private", | 145 | visibility="private", |
84 | media_ids=media_id) | 146 | media_ids=media_id) |
85 | 147 | ||
86 | await query.edit_message_text(text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", | 148 | await update.message.delete_message() |
87 | parse_mode=telegram.constants.ParseMode.MARKDOWN) | 149 | return ConversationHandler.END |
88 | 150 | ||
89 | 151 | ||
90 | async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | 152 | async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
@@ -92,13 +154,117 @@ async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No | |||
92 | await update.message.reply_text("Use /start to test this bot.") | 154 | await update.message.reply_text("Use /start to test this bot.") |
93 | 155 | ||
94 | 156 | ||
157 | async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
158 | """Cancels and ends the conversation.""" | ||
159 | user = update.message.from_user | ||
160 | logger.info("User %s canceled the conversation.", user.first_name) | ||
161 | await update.message.reply_text( | ||
162 | "Bye! I hope we can talk again some day.", reply_markup=ReplyKeyboardRemove() | ||
163 | ) | ||
164 | |||
165 | return ConversationHandler.END | ||
166 | |||
167 | |||
168 | class MsgDict(TypedDict): | ||
169 | media_id: str | ||
170 | caption: str | ||
171 | status_id: int | ||
172 | content: str | ||
173 | chat_id: int | ||
174 | |||
175 | |||
176 | async def media_group_sender(context: CallbackContext): | ||
177 | context.job.data = cast(List[MsgDict], context.job.data) | ||
178 | |||
179 | media_id = [] | ||
180 | chat_id = context.job.data[0].get("chat_id") | ||
181 | for msg_dict in context.job.data: | ||
182 | file = await context.bot.get_file(msg_dict.get("media_id")) | ||
183 | img = io.BytesIO() | ||
184 | await file.download_to_memory(img) | ||
185 | |||
186 | img.seek(0) | ||
187 | |||
188 | media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") | ||
189 | media_id.append(media["id"]) | ||
190 | |||
191 | mastodon_client.status_update( | ||
192 | status=msg_dict.get("content"), | ||
193 | id=msg_dict.get("status_id"), | ||
194 | media_ids=media_id) | ||
195 | |||
196 | await context.bot.send_message(chat_id=chat_id, text="Done", | ||
197 | reply_markup=telegram.ReplyKeyboardMarkup([ | ||
198 | [telegram.KeyboardButton(text="Check in", request_location=True)], | ||
199 | [telegram.KeyboardButton(text="Setting")]])) | ||
200 | |||
201 | |||
202 | async def photo(update: Update, context: CallbackContext): | ||
203 | """Stores the photo and asks for a location.""" | ||
204 | global scheduler | ||
205 | await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) | ||
206 | |||
207 | status_id = context.user_data["status_id"] | ||
208 | status_content = context.user_data["status_content"] | ||
209 | |||
210 | message = update.effective_message | ||
211 | context.user_data["media"] = [] | ||
212 | if message.media_group_id: | ||
213 | media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id | ||
214 | msg_dict = { | ||
215 | "media_id": media_id, | ||
216 | "caption": message.caption_html, | ||
217 | "status_id": status_id, | ||
218 | "content": status_content, | ||
219 | "chat_id": message.chat_id, | ||
220 | } | ||
221 | global jobname | ||
222 | jobname = str(message.media_group_id) | ||
223 | jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) | ||
224 | if jobs: | ||
225 | jobs[0].data.append(msg_dict) | ||
226 | else: | ||
227 | context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict], | ||
228 | name=str(message.media_group_id)) | ||
229 | else: | ||
230 | file = await update.message.effective_attachment[-1].get_file() | ||
231 | img = io.BytesIO() | ||
232 | await file.download_to_memory(img) | ||
233 | img.seek(0) | ||
234 | context.user_data["photo"].append(img.read()) | ||
235 | |||
236 | await process_location(update, context) | ||
237 | |||
238 | |||
239 | async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
240 | print(context.user_data) | ||
241 | reply_markup = telegram.ReplyKeyboardRemove() | ||
242 | await update.message.reply_text( | ||
243 | text="Done.", reply_markup=reply_markup | ||
244 | ) | ||
245 | return ConversationHandler.END | ||
246 | |||
247 | |||
95 | def main() -> None: | 248 | def main() -> None: |
96 | application = Application.builder().token(BOT_TOKEN).build() | 249 | application = Application.builder().token(BOT_TOKEN).build() |
97 | 250 | ||
98 | application.add_handler(CommandHandler("start", start)) | 251 | conv_handler = ConversationHandler( |
99 | application.add_handler(CallbackQueryHandler(button)) | 252 | entry_points=[ |
100 | application.add_handler(MessageHandler(filters.LOCATION & ~filters.COMMAND, checkin)) | 253 | CommandHandler("start", start), |
101 | application.add_handler(CommandHandler("help", help_command)) | 254 | ], |
255 | states={ | ||
256 | LOCATION: [ | ||
257 | MessageHandler(filters.LOCATION, checkin), | ||
258 | ], | ||
259 | WAIT_LOC: [CallbackQueryHandler(process_callback)], | ||
260 | PHOTO: [MessageHandler(filters.PHOTO, photo), | ||
261 | CommandHandler("skip", skip_photo)], | ||
262 | }, | ||
263 | fallbacks=[CommandHandler("cancel", cancel)], | ||
264 | per_message=False, | ||
265 | ) | ||
266 | |||
267 | application.add_handler(conv_handler) | ||
102 | 268 | ||
103 | # Run the bot until the user presses Ctrl-C | 269 | # Run the bot until the user presses Ctrl-C |
104 | application.run_polling() | 270 | application.run_polling() |