From 3a76618d0f46c7e28641b74a869544661c504e21 Mon Sep 17 00:00:00 2001 From: clarkzjw Date: Tue, 21 Feb 2023 23:32:21 -0800 Subject: refactoring code --- callback.py | 266 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 callback.py (limited to 'callback.py') diff --git a/callback.py b/callback.py new file mode 100644 index 0000000..8c80028 --- /dev/null +++ b/callback.py @@ -0,0 +1,266 @@ +import io +from foursquare.poi import OSM_ENDPOINT +from foursquare.poi import query_poi +from dbstore.dbm_store import get_loc +from toot import mastodon_client +from command import * +from telegram import __version__ as TG_VER +from typing import cast, List +from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton +from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, \ + ConversationHandler, CallbackContext +from telegram.constants import ParseMode, ChatAction +from telegram.error import BadRequest +from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardRemove + + +async def callback_skip_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query = update.callback_query + await query.answer() + + await query.delete_message() + await query.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU) + + return ConversationHandler.END + + +async def callback_location_sharing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + if update.message.venue is not None: + fsq_id = update.message.venue.foursquare_id + title = update.message.venue.title + context.user_data["fsq_id"] = fsq_id + context.user_data["title"] = title + context.user_data["latitude"] = update.message.venue.location.latitude + context.user_data["longitude"] = update.message.venue.location.longitude + + poi = get_loc(context.user_data["fsq_id"]) + media_id = [] + content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" + status = mastodon_client.status_post( + content, + visibility="private", + media_ids=media_id) + + context.user_data["status_id"] = status["id"] + context.user_data["status_content"] = content + + print("status_id", context.user_data["status_id"]) + + await update.message.reply_text( + text=f"Selected place: {poi['name']}, \nPosted to Mastodon: {status['url']}", + parse_mode=ParseMode.MARKDOWN, + ) + + prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) + context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id + + return ADD_COMMENT + else: + context.user_data["latitude"] = update.message.location.latitude + context.user_data["longitude"] = update.message.location.longitude + + await update.message.reply_text("Searching...", reply_markup=ReplyKeyboardRemove()) + prompt_msg = await update.message.reply_text(PROMPT_LOCATION_KEYWORD, reply_markup=INLINE_SKIP_MENU) + + context.user_data[PROMPT_LOCATION_KEYWORD] = prompt_msg.message_id + return LOCATION_SEARCH_KEYWORD + + +async def callback_manual_location(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + loc = update.effective_message.text + osm_url = OSM_ENDPOINT.format(context.user_data["latitude"], context.user_data["longitude"]) + media_id = [] + content = f"I'm at {loc}, {osm_url}" + status = mastodon_client.status_post( + content, + visibility="private", + media_ids=media_id) + + context.user_data["status_id"] = status["id"] + context.user_data["status_content"] = content + + print("status_id", context.user_data["status_id"]) + + await update.message.reply_text( + text=f"Manually selected place: {loc}, \nPosted to Mastodon: {status['url']}", + parse_mode=ParseMode.MARKDOWN, + ) + + prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) + context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id + + return ADD_COMMENT + + +async def callback_location_confirmation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query = update.callback_query + await query.answer() + context.user_data["fsq_id"] = query.data + + await query.delete_message() + + poi = get_loc(context.user_data["fsq_id"]) + media_id = [] + content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" + status = mastodon_client.status_post( + content, + visibility="private", + media_ids=media_id) + + context.user_data["status_id"] = status["id"] + context.user_data["status_content"] = content + + print("status_id", context.user_data["status_id"]) + + await query.message.reply_text( + text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", + parse_mode=ParseMode.MARKDOWN, + ) + + prompt_attach_comment_msg = await query.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) + context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id + + return ADD_COMMENT + + +async def callback_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_LOCATION_KEYWORD]) + + location_search = update.effective_message.text + latitude = context.user_data["latitude"] + longitude = context.user_data["longitude"] + + keyboard = [] + poi_result = query_poi(location_search, latitude, longitude) + if len(poi_result) == 0: + poi_result = query_poi("", latitude, longitude) + + for poi in poi_result: + keyboard.append([ + InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), + ]) + + if len(keyboard) == 0: + await update.message.reply_text(PROMPT_NO_NEARBY_POI) + return LOCATION_CONFIRMATION + else: + reply_markup = InlineKeyboardMarkup(keyboard) + context.user_data["location_search"] = location_search + await update.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup) + + return LOCATION_CONFIRMATION + + +async def callback_skip_location_keyword(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query = update.callback_query + await query.answer() + + await query.message.delete() + latitude = context.user_data["latitude"] + longitude = context.user_data["longitude"] + + keyboard = [] + + for poi in query_poi("", latitude, longitude): + keyboard.append([ + InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), + ]) + + reply_markup = InlineKeyboardMarkup(keyboard) + await query.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup) + + return LOCATION_CONFIRMATION + + +async def callback_add_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT]) + comment = update.effective_message.text + + mastodon_client.status_update( + status=f"{comment} " + context.user_data["status_content"], + id=context.user_data["status_id"]) + + context.user_data["status_content"] = f"{comment} " + context.user_data["status_content"] + prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU) + context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id + + return ADD_MEDIA + + +async def callback_skip_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT]) + prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU) + context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id + return ADD_MEDIA + + +async def process_media_group(context: CallbackContext): + context.job.data = cast(List[MsgDict], context.job.data) + + media_id = [] + chat_id = context.job.data[0].get("chat_id") + for msg_dict in context.job.data: + if len(media_id) >= 4: + await context.bot.send_message(chat_id=chat_id, text=PROMPT_MAX_PHOTO_REACHED, reply_markup=MAIN_MENU) + return + + file = await context.bot.get_file(msg_dict.get("media_id")) + img = io.BytesIO() + await file.download_to_memory(img) + + img.seek(0) + + media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") + media_id.append(media["id"]) + + mastodon_client.status_update( + status=msg_dict.get("content"), + id=msg_dict.get("status_id"), + media_ids=media_id) + + await context.bot.send_message(chat_id=chat_id, text=PROMPT_DONE, reply_markup=MAIN_MENU) + + +async def callback_add_media(update: Update, context: CallbackContext): + await update.message.reply_chat_action(ChatAction.TYPING) + + try: + await context.bot.delete_message(chat_id=update.message.chat_id, + message_id=context.user_data[PROMPT_ADD_MEDIA]) + except BadRequest as e: + if "not found" in str(e.message): + pass + + status_id = context.user_data["status_id"] + status_content = context.user_data["status_content"] + + message = update.effective_message + context.user_data["media"] = [] + if message.media_group_id: + media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id + msg_dict = { + "media_id": media_id, + "caption": message.caption_html, + "status_id": status_id, + "content": status_content, + "chat_id": message.chat_id, + } + jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) + if jobs: + jobs[0].data.append(msg_dict) + else: + context.job_queue.run_once(callback=process_media_group, when=MEDIA_GROUP_TIMEOUT, + data=[msg_dict], name=str(message.media_group_id)) + else: + file = await update.message.effective_attachment[-1].get_file() + img = io.BytesIO() + await file.download_to_memory(img) + img.seek(0) + + media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") + mastodon_client.status_update( + status=status_content, + id=status_id, + media_ids=media["id"]) + + await update.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU) -- cgit v1.2.3