diff options
Diffstat (limited to 'bot.py')
-rw-r--r-- | bot.py | 233 |
1 files changed, 213 insertions, 20 deletions
@@ -2,12 +2,8 @@ | |||
2 | # pylint: disable=unused-argument, wrong-import-position | 2 | # pylint: disable=unused-argument, wrong-import-position |
3 | # This program is dedicated to the public domain under the CC0 license. | 3 | # This program is dedicated to the public domain under the CC0 license. |
4 | 4 | ||
5 | """ | ||
6 | Basic example for a bot that uses inline keyboards. For an in-depth explanation, check out | ||
7 | https://github.com/python-telegram-bot/python-telegram-bot/wiki/InlineKeyboard-Example. | ||
8 | """ | ||
9 | import logging | 5 | import logging |
10 | 6 | import io | |
11 | import telegram.constants | 7 | import telegram.constants |
12 | from telegram import __version__ as TG_VER | 8 | from telegram import __version__ as TG_VER |
13 | 9 | ||
@@ -22,13 +18,20 @@ if __version_info__ < (20, 0, 0, "alpha", 1): | |||
22 | f"{TG_VER} version of this example, " | 18 | f"{TG_VER} version of this example, " |
23 | f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" | 19 | f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" |
24 | ) | 20 | ) |
25 | from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update | 21 | from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardRemove |
26 | from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters | 22 | from telegram.ext import Application, CallbackQueryHandler, \ |
27 | 23 | CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext, JobQueue | |
28 | from config import BOT_TOKEN | 24 | from config import BOT_TOKEN |
29 | from foursquare.query_poi import query_poi | 25 | from foursquare.poi import query_poi |
30 | from dbstore.dbm_store import get_loc | 26 | from dbstore.dbm_store import get_loc |
31 | from toot import mastodon_client | 27 | from toot import mastodon_client |
28 | from typing import TypedDict, List, cast | ||
29 | from telegram import Update, InputMediaVideo, InputMediaPhoto | ||
30 | |||
31 | scheduler = None | ||
32 | PRIVACY, TOOT = map(chr, range(8, 10)) | ||
33 | |||
34 | WAIT_LOC, LOCATION, PHOTO, PROCESS_PHOTO, FINAL = range(5) | ||
32 | 35 | ||
33 | # Enable logging | 36 | # Enable logging |
34 | logging.basicConfig( | 37 | logging.basicConfig( |
@@ -46,6 +49,12 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |||
46 | "Start using this bot by sharing your location using Telegram context menu to it." | 49 | "Start using this bot by sharing your location using Telegram context menu to it." |
47 | 50 | ||
48 | await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) | 51 | await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) |
52 | await update.message.reply_text("Please choose", | ||
53 | reply_markup=telegram.ReplyKeyboardMarkup([ | ||
54 | [telegram.KeyboardButton(text="Check in", request_location=True)], | ||
55 | [telegram.KeyboardButton(text="Setting")]])) | ||
56 | |||
57 | return LOCATION | ||
49 | 58 | ||
50 | 59 | ||
51 | async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | 60 | async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
@@ -59,20 +68,85 @@ async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |||
59 | reply_markup = InlineKeyboardMarkup(keyboard) | 68 | reply_markup = InlineKeyboardMarkup(keyboard) |
60 | await update.message.reply_text("Select a place", reply_markup=reply_markup) | 69 | await update.message.reply_text("Select a place", reply_markup=reply_markup) |
61 | 70 | ||
71 | return WAIT_LOC | ||
62 | 72 | ||
63 | async def button(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
64 | """Parses the CallbackQuery and updates the message text.""" | ||
65 | query = update.callback_query | ||
66 | 73 | ||
74 | async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
75 | query = update.callback_query | ||
67 | await query.answer() | 76 | await query.answer() |
68 | poi = get_loc(query.data) | 77 | print(query.data) |
78 | context.user_data["fsq_id"] = query.data | ||
79 | await query.delete_message() | ||
69 | 80 | ||
81 | poi = get_loc(context.user_data["fsq_id"]) | ||
82 | media_id = [] | ||
83 | content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" | ||
70 | status = mastodon_client.status_post( | 84 | status = mastodon_client.status_post( |
85 | content, | ||
86 | visibility="private", | ||
87 | media_ids=media_id) | ||
88 | |||
89 | context.user_data["status_id"] = status["id"] | ||
90 | context.user_data["status_content"] = content | ||
91 | |||
92 | print("status_id", context.user_data["status_id"]) | ||
93 | |||
94 | await query.message.reply_text( | ||
95 | text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", | ||
96 | parse_mode=telegram.constants.ParseMode.MARKDOWN, | ||
97 | reply_markup=telegram.ReplyKeyboardMarkup([ | ||
98 | [telegram.KeyboardButton(text="Check in", request_location=True)], | ||
99 | [telegram.KeyboardButton(text="Setting")]]) | ||
100 | ) | ||
101 | |||
102 | await query.message.reply_text("You can continue attaching photos, or press skip to continue", | ||
103 | reply_markup=telegram.ReplyKeyboardMarkup([ | ||
104 | [telegram.KeyboardButton(text="/skip")]])) | ||
105 | return PHOTO | ||
106 | |||
107 | |||
108 | async def action(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
109 | if update.message.text == "Check in": | ||
110 | await update.message.reply_text("Please share your location", | ||
111 | reply_markup=telegram.ReplyKeyboardRemove()) | ||
112 | elif update.message.text == "Setting": | ||
113 | await update.message.reply_text("Setting") | ||
114 | |||
115 | |||
116 | async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
117 | keyboard = [[ | ||
118 | InlineKeyboardButton("Privacy", callback_data=PRIVACY), | ||
119 | ]] | ||
120 | |||
121 | reply_markup = InlineKeyboardMarkup(keyboard) | ||
122 | await update.message.reply_text("Setting", reply_markup=reply_markup) | ||
123 | |||
124 | |||
125 | async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
126 | await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) | ||
127 | |||
128 | fsq_id = context.user_data["fsq_id"] | ||
129 | poi = get_loc(context.user_data["fsq_id"]) | ||
130 | media_id = [] | ||
131 | |||
132 | if context.user_data.get("photo") is not None: | ||
133 | media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg") | ||
134 | media_id = [media["id"]] | ||
135 | # else: | ||
136 | # photo_url = get_poi_top_photo(context.user_data["fsq_id"]) | ||
137 | # if photo_url is not None: | ||
138 | # with urllib.request.urlopen(photo_url) as response: | ||
139 | # data = response.read() | ||
140 | # media = mastodon_client.media_post(data, mime_type="image/jpeg") | ||
141 | # media_id = [media["id"]] | ||
142 | |||
143 | mastodon_client.status_post( | ||
71 | f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", | 144 | f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", |
72 | visibility="private") | 145 | visibility="private", |
146 | media_ids=media_id) | ||
73 | 147 | ||
74 | await query.edit_message_text(text=f"Selected place: {poi['name']}\nPosted to Mastodon: {status['url']}", | 148 | await update.message.delete_message() |
75 | parse_mode=telegram.constants.ParseMode.MARKDOWN) | 149 | return ConversationHandler.END |
76 | 150 | ||
77 | 151 | ||
78 | async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | 152 | async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
@@ -80,13 +154,132 @@ async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No | |||
80 | await update.message.reply_text("Use /start to test this bot.") | 154 | await update.message.reply_text("Use /start to test this bot.") |
81 | 155 | ||
82 | 156 | ||
157 | async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
158 | """Cancels and ends the conversation.""" | ||
159 | user = update.message.from_user | ||
160 | logger.info("User %s canceled the conversation.", user.first_name) | ||
161 | await update.message.reply_text( | ||
162 | "Bye! I hope we can talk again some day.", reply_markup=ReplyKeyboardRemove() | ||
163 | ) | ||
164 | |||
165 | return ConversationHandler.END | ||
166 | |||
167 | |||
168 | class MsgDict(TypedDict): | ||
169 | media_id: str | ||
170 | caption: str | ||
171 | status_id: int | ||
172 | content: str | ||
173 | chat_id: int | ||
174 | |||
175 | |||
176 | async def media_group_sender(context: CallbackContext): | ||
177 | context.job.data = cast(List[MsgDict], context.job.data) | ||
178 | |||
179 | media_id = [] | ||
180 | chat_id = context.job.data[0].get("chat_id") | ||
181 | for msg_dict in context.job.data: | ||
182 | if len(media_id) >= 4: | ||
183 | print("Cannot attach more than 4 photos") | ||
184 | break | ||
185 | file = await context.bot.get_file(msg_dict.get("media_id")) | ||
186 | img = io.BytesIO() | ||
187 | await file.download_to_memory(img) | ||
188 | |||
189 | img.seek(0) | ||
190 | |||
191 | media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") | ||
192 | media_id.append(media["id"]) | ||
193 | |||
194 | mastodon_client.status_update( | ||
195 | status=msg_dict.get("content"), | ||
196 | id=msg_dict.get("status_id"), | ||
197 | media_ids=media_id) | ||
198 | |||
199 | await context.bot.send_message(chat_id=chat_id, text="Done", | ||
200 | reply_markup=telegram.ReplyKeyboardMarkup([ | ||
201 | [telegram.KeyboardButton(text="Check in", request_location=True)], | ||
202 | [telegram.KeyboardButton(text="Setting")]])) | ||
203 | |||
204 | |||
205 | async def photo(update: Update, context: CallbackContext): | ||
206 | """Stores the photo and asks for a location.""" | ||
207 | global scheduler | ||
208 | await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) | ||
209 | |||
210 | status_id = context.user_data["status_id"] | ||
211 | status_content = context.user_data["status_content"] | ||
212 | |||
213 | message = update.effective_message | ||
214 | context.user_data["media"] = [] | ||
215 | if message.media_group_id: | ||
216 | media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id | ||
217 | msg_dict = { | ||
218 | "media_id": media_id, | ||
219 | "caption": message.caption_html, | ||
220 | "status_id": status_id, | ||
221 | "content": status_content, | ||
222 | "chat_id": message.chat_id, | ||
223 | } | ||
224 | jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) | ||
225 | if jobs: | ||
226 | jobs[0].data.append(msg_dict) | ||
227 | else: | ||
228 | # TODO | ||
229 | # media_group_sender won't end the callback context | ||
230 | # should add a job event listener | ||
231 | context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict], | ||
232 | name=str(message.media_group_id)) | ||
233 | else: | ||
234 | file = await update.message.effective_attachment[-1].get_file() | ||
235 | img = io.BytesIO() | ||
236 | await file.download_to_memory(img) | ||
237 | img.seek(0) | ||
238 | |||
239 | media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") | ||
240 | mastodon_client.status_update( | ||
241 | status=status_content, | ||
242 | id=status_id, | ||
243 | media_ids=media["id"]) | ||
244 | |||
245 | await update.message.reply_text(text="Done", | ||
246 | reply_markup=telegram.ReplyKeyboardMarkup([ | ||
247 | [telegram.KeyboardButton(text="Check in", request_location=True)], | ||
248 | [telegram.KeyboardButton(text="Setting")]])) | ||
249 | |||
250 | |||
251 | async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
252 | print(context.user_data) | ||
253 | await update.message.reply_text( | ||
254 | text="Done.", reply_markup=telegram.ReplyKeyboardMarkup([ | ||
255 | [telegram.KeyboardButton(text="Check in", request_location=True)], | ||
256 | [telegram.KeyboardButton(text="Setting")]]) | ||
257 | ) | ||
258 | return ConversationHandler.END | ||
259 | |||
260 | |||
83 | def main() -> None: | 261 | def main() -> None: |
84 | application = Application.builder().token(BOT_TOKEN).build() | 262 | application = Application.builder().token(BOT_TOKEN).build() |
85 | 263 | ||
86 | application.add_handler(CommandHandler("start", start)) | 264 | conv_handler = ConversationHandler( |
87 | application.add_handler(CallbackQueryHandler(button)) | 265 | entry_points=[ |
88 | application.add_handler(MessageHandler(filters.LOCATION & ~filters.COMMAND, checkin)) | 266 | CommandHandler("start", start), |
89 | application.add_handler(CommandHandler("help", help_command)) | 267 | MessageHandler(filters.LOCATION, checkin), |
268 | ], | ||
269 | states={ | ||
270 | LOCATION: [ | ||
271 | MessageHandler(filters.LOCATION, checkin), | ||
272 | ], | ||
273 | WAIT_LOC: [CallbackQueryHandler(process_callback)], | ||
274 | PHOTO: [MessageHandler(filters.PHOTO, photo), | ||
275 | CommandHandler("skip", skip_photo)], | ||
276 | }, | ||
277 | fallbacks=[CommandHandler("cancel", cancel)], | ||
278 | per_message=False, | ||
279 | allow_reentry=True, | ||
280 | ) | ||
281 | |||
282 | application.add_handler(conv_handler) | ||
90 | 283 | ||
91 | # Run the bot until the user presses Ctrl-C | 284 | # Run the bot until the user presses Ctrl-C |
92 | application.run_polling() | 285 | application.run_polling() |