From 3a76618d0f46c7e28641b74a869544661c504e21 Mon Sep 17 00:00:00 2001 From: clarkzjw Date: Tue, 21 Feb 2023 23:32:21 -0800 Subject: refactoring code --- bot.py | 329 +---------------------------------------------------------------- 1 file changed, 4 insertions(+), 325 deletions(-) (limited to 'bot.py') diff --git a/bot.py b/bot.py index f238e75..f2be049 100644 --- a/bot.py +++ b/bot.py @@ -1,342 +1,21 @@ #!/usr/bin/env python -import telegram.constants -from telegram import __version__ as TG_VER - -try: - from telegram import __version_info__ -except ImportError: - __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment] - -if __version_info__ < (20, 0, 0, "alpha", 1): - raise RuntimeError( - f"This example is not compatible with your current PTB version {TG_VER}. To view the " - f"{TG_VER} version of this example, " - f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" - ) - import logging -from pprint import pprint -import io -from foursquare.poi import OSM_ENDPOINT -from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton -from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, \ - ConversationHandler, CallbackContext -from config import BOT_TOKEN, MEDIA_GROUP_TIMEOUT -from foursquare.poi import query_poi -from dbstore.dbm_store import get_loc -from toot import mastodon_client -from typing import TypedDict, List, cast -from prompt.string import * - +from callback import * +from telegram.ext import Application, CallbackQueryHandler, CommandHandler, MessageHandler, filters, ConversationHandler logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO ) logger = logging.getLogger(__name__) -WAIT_LOCATION, LOCATION_SEARCH_KEYWORD, LOCATION_CONFIRMATION, ADD_MEDIA, ADD_COMMENT = range(5) - -MAIN_MENU = ReplyKeyboardMarkup([ - [KeyboardButton(text="Check-in here", request_location=True)], -]) - -SKIP_LOCATION_SEARCH = CALLBACK_SKIP -INLINE_SKIP_MENU = InlineKeyboardMarkup([ - [telegram.InlineKeyboardButton("Skip", callback_data=SKIP_LOCATION_SEARCH)] -]) - - -class MsgDict(TypedDict): - media_id: str - caption: str - status_id: int - content: str - chat_id: int - - -async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - hello = "Hello, this is `checkin.bot`. \n\n" \ - "This is a Telegram bot with functionality similar to Foursquare Swarm, " \ - "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \ - "Aware of privacy concerns, this bot will not store your location data." \ - "*Be safe and cautious when sharing your real time location on the web.* \n\n" \ - "Start using this bot by sharing your location using Telegram context menu to it." - - await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) - await update.message.reply_text(PROMPT_CHOOSE_ACTION, reply_markup=MAIN_MENU) - - return WAIT_LOCATION - - -async def callback_location_sharing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - if update.message.venue is not None: - fsq_id = update.message.venue.foursquare_id - title = update.message.venue.title - context.user_data["fsq_id"] = fsq_id - context.user_data["title"] = title - context.user_data["latitude"] = update.message.venue.location.latitude - context.user_data["longitude"] = update.message.venue.location.longitude - - poi = get_loc(context.user_data["fsq_id"]) - media_id = [] - content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" - status = mastodon_client.status_post( - content, - visibility="private", - media_ids=media_id) - - context.user_data["status_id"] = status["id"] - context.user_data["status_content"] = content - - print("status_id", context.user_data["status_id"]) - - await update.message.reply_text( - text=f"Selected place: {poi['name']}, \nPosted to Mastodon: {status['url']}", - parse_mode=telegram.constants.ParseMode.MARKDOWN, - ) - - prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) - context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id - - return ADD_COMMENT - else: - context.user_data["latitude"] = update.message.location.latitude - context.user_data["longitude"] = update.message.location.longitude - - await update.message.reply_text("Searching...", reply_markup=telegram.ReplyKeyboardRemove()) - prompt_msg = await update.message.reply_text(PROMPT_LOCATION_KEYWORD, reply_markup=INLINE_SKIP_MENU) - - context.user_data[PROMPT_LOCATION_KEYWORD] = prompt_msg.message_id - return LOCATION_SEARCH_KEYWORD - - -async def callback_manual_location(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - loc = update.effective_message.text - osm_url = OSM_ENDPOINT.format(context.user_data["latitude"], context.user_data["longitude"]) - media_id = [] - content = f"I'm at {loc}, {osm_url}" - status = mastodon_client.status_post( - content, - visibility="private", - media_ids=media_id) - - context.user_data["status_id"] = status["id"] - context.user_data["status_content"] = content - - print("status_id", context.user_data["status_id"]) - - await update.message.reply_text( - text=f"Manually selected place: {loc}, \nPosted to Mastodon: {status['url']}", - parse_mode=telegram.constants.ParseMode.MARKDOWN, - ) - - prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) - context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id - - return ADD_COMMENT - - -async def callback_location_confirmation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - query = update.callback_query - await query.answer() - context.user_data["fsq_id"] = query.data - - await query.delete_message() - - poi = get_loc(context.user_data["fsq_id"]) - media_id = [] - content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" - status = mastodon_client.status_post( - content, - visibility="private", - media_ids=media_id) - - context.user_data["status_id"] = status["id"] - context.user_data["status_content"] = content - - print("status_id", context.user_data["status_id"]) - - await query.message.reply_text( - text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", - parse_mode=telegram.constants.ParseMode.MARKDOWN, - ) - - prompt_attach_comment_msg = await query.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) - context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id - - return ADD_COMMENT - - -async def callback_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_LOCATION_KEYWORD]) - - location_search = update.effective_message.text - latitude = context.user_data["latitude"] - longitude = context.user_data["longitude"] - - keyboard = [] - poi_result = query_poi(location_search, latitude, longitude) - if len(poi_result) == 0: - poi_result = query_poi("", latitude, longitude) - - for poi in poi_result: - keyboard.append([ - InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), - ]) - - if len(keyboard) == 0: - await update.message.reply_text(PROMPT_NO_NEARBY_POI) - return LOCATION_CONFIRMATION - else: - reply_markup = InlineKeyboardMarkup(keyboard) - context.user_data["location_search"] = location_search - await update.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup) - - return LOCATION_CONFIRMATION - - -async def callback_skip_location_keyword(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - query = update.callback_query - await query.answer() - - await query.message.delete() - latitude = context.user_data["latitude"] - longitude = context.user_data["longitude"] - - keyboard = [] - - for poi in query_poi("", latitude, longitude): - keyboard.append([ - InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), - ]) - - reply_markup = InlineKeyboardMarkup(keyboard) - await query.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup) - - return LOCATION_CONFIRMATION - - -async def callback_add_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT]) - comment = update.effective_message.text - - mastodon_client.status_update( - status=f"{comment} " + context.user_data["status_content"], - id=context.user_data["status_id"]) - - context.user_data["status_content"] = f"{comment} " + context.user_data["status_content"] - prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU) - context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id - - return ADD_MEDIA - - -async def callback_skip_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT]) - prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU) - context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id - return ADD_MEDIA - - -async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - await update.message.reply_text(PROMPT_HELP) - - -async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - await update.message.reply_text(text=PROMPT_CANCELED, reply_markup=MAIN_MENU) - - return ConversationHandler.END - - -async def process_media_group(context: CallbackContext): - context.job.data = cast(List[MsgDict], context.job.data) - - media_id = [] - chat_id = context.job.data[0].get("chat_id") - for msg_dict in context.job.data: - if len(media_id) >= 4: - await context.bot.send_message(chat_id=chat_id, text=PROMPT_MAX_PHOTO_REACHED, reply_markup=MAIN_MENU) - return - - file = await context.bot.get_file(msg_dict.get("media_id")) - img = io.BytesIO() - await file.download_to_memory(img) - - img.seek(0) - - media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") - media_id.append(media["id"]) - - mastodon_client.status_update( - status=msg_dict.get("content"), - id=msg_dict.get("status_id"), - media_ids=media_id) - - await context.bot.send_message(chat_id=chat_id, text=PROMPT_DONE, reply_markup=MAIN_MENU) - - -async def callback_add_media(update: Update, context: CallbackContext): - await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) - - try: - await context.bot.delete_message(chat_id=update.message.chat_id, - message_id=context.user_data[PROMPT_ADD_MEDIA]) - except telegram.error.BadRequest as e: - if "not found" in str(e.message): - pass - - status_id = context.user_data["status_id"] - status_content = context.user_data["status_content"] - - message = update.effective_message - context.user_data["media"] = [] - if message.media_group_id: - media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id - msg_dict = { - "media_id": media_id, - "caption": message.caption_html, - "status_id": status_id, - "content": status_content, - "chat_id": message.chat_id, - } - jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) - if jobs: - jobs[0].data.append(msg_dict) - else: - context.job_queue.run_once(callback=process_media_group, when=MEDIA_GROUP_TIMEOUT, - data=[msg_dict], name=str(message.media_group_id)) - else: - file = await update.message.effective_attachment[-1].get_file() - img = io.BytesIO() - await file.download_to_memory(img) - img.seek(0) - - media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") - mastodon_client.status_update( - status=status_content, - id=status_id, - media_ids=media["id"]) - - await update.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU) - - -async def callback_skip_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - query = update.callback_query - await query.answer() - - await query.delete_message() - await query.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU) - - return ConversationHandler.END - def main() -> None: application = Application.builder().token(BOT_TOKEN).build() checkin_handler = ConversationHandler( entry_points=[ - CommandHandler("start", start), + CommandHandler("start", start_command), MessageHandler(filters.LOCATION, callback_location_sharing), ], states={ @@ -358,7 +37,7 @@ def main() -> None: ADD_MEDIA: [MessageHandler(filters.PHOTO, callback_add_media), CallbackQueryHandler(callback_skip_media)], }, - fallbacks=[CommandHandler("cancel", cancel)], + fallbacks=[CommandHandler("cancel", cancel_command)], per_message=False, allow_reentry=True, ) -- cgit v1.2.3