From efc952972f97f98814b2e9a49648766101cd0725 Mon Sep 17 00:00:00 2001 From: clarkzjw Date: Tue, 21 Feb 2023 23:12:26 -0800 Subject: renaming functions --- bot.py | 278 ++++++++++++++++++------------------------------------- config.py | 2 + prompt/string.py | 12 +++ 3 files changed, 104 insertions(+), 188 deletions(-) create mode 100644 prompt/string.py diff --git a/bot.py b/bot.py index c7a3180..f238e75 100644 --- a/bot.py +++ b/bot.py @@ -1,12 +1,7 @@ #!/usr/bin/env python -# pylint: disable=unused-argument, wrong-import-position -# This program is dedicated to the public domain under the CC0 license. -import logging -import io import telegram.constants from telegram import __version__ as TG_VER -from pprint import pprint try: from telegram import __version_info__ @@ -20,46 +15,44 @@ if __version_info__ < (20, 0, 0, "alpha", 1): f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" ) +import logging +from pprint import pprint +import io from foursquare.poi import OSM_ENDPOINT from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, \ ConversationHandler, CallbackContext -from config import BOT_TOKEN +from config import BOT_TOKEN, MEDIA_GROUP_TIMEOUT from foursquare.poi import query_poi from dbstore.dbm_store import get_loc from toot import mastodon_client from typing import TypedDict, List, cast +from prompt.string import * -scheduler = None -PRIVACY, TOOT = map(chr, range(8, 10)) - -WAIT_LOC, LOCATION, LOCATION_SEARCH, PHOTO, PROCESS_PHOTO, COMMENT, SETTING = range(7) -# Enable logging logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO ) logger = logging.getLogger(__name__) +WAIT_LOCATION, LOCATION_SEARCH_KEYWORD, LOCATION_CONFIRMATION, ADD_MEDIA, ADD_COMMENT = range(5) + MAIN_MENU = ReplyKeyboardMarkup([ [KeyboardButton(text="Check-in here", request_location=True)], - # [KeyboardButton(text="/cancel")], - # [KeyboardButton(text="/setting")] ]) -SKIP_LOCATION_SEARCH = "skip_location_search" - -SKIP_MENU = InlineKeyboardMarkup([ +SKIP_LOCATION_SEARCH = CALLBACK_SKIP +INLINE_SKIP_MENU = InlineKeyboardMarkup([ [telegram.InlineKeyboardButton("Skip", callback_data=SKIP_LOCATION_SEARCH)] ]) -# SETTING_MENU = InlineKeyboardMarkup( -# [ -# [InlineKeyboardButton(text="/tos")], -# [InlineKeyboardButton(text="/back")], -# ] -# ) +class MsgDict(TypedDict): + media_id: str + caption: str + status_id: int + content: str + chat_id: int async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: @@ -71,13 +64,12 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: "Start using this bot by sharing your location using Telegram context menu to it." await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) - await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU) + await update.message.reply_text(PROMPT_CHOOSE_ACTION, reply_markup=MAIN_MENU) - return LOCATION + return WAIT_LOCATION -async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - pprint(update.message.location) +async def callback_location_sharing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: if update.message.venue is not None: fsq_id = update.message.venue.foursquare_id title = update.message.venue.title @@ -104,28 +96,22 @@ async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: parse_mode=telegram.constants.ParseMode.MARKDOWN, ) - prompt_attach_comment_msg = await update.message.reply_text( - "You can continue adding comments, or press skip to finish", reply_markup=SKIP_MENU) - context.user_data["prompt_attach_comment_msg_id"] = prompt_attach_comment_msg.message_id + prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) + context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id - return COMMENT + return ADD_COMMENT else: context.user_data["latitude"] = update.message.location.latitude context.user_data["longitude"] = update.message.location.longitude await update.message.reply_text("Searching...", reply_markup=telegram.ReplyKeyboardRemove()) - prompt_msg = await update.message.reply_text("You can input location search keywords or press skip", - reply_markup=SKIP_MENU) + prompt_msg = await update.message.reply_text(PROMPT_LOCATION_KEYWORD, reply_markup=INLINE_SKIP_MENU) - context.user_data["prompt_msg_id"] = prompt_msg.message_id - return LOCATION_SEARCH + context.user_data[PROMPT_LOCATION_KEYWORD] = prompt_msg.message_id + return LOCATION_SEARCH_KEYWORD -async def manual_location_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - # query = update.callback_query - # await query.answer() - # await query.delete_message() - +async def callback_manual_location(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: loc = update.effective_message.text osm_url = OSM_ENDPOINT.format(context.user_data["latitude"], context.user_data["longitude"]) media_id = [] @@ -143,22 +129,19 @@ async def manual_location_process_callback(update: Update, context: ContextTypes await update.message.reply_text( text=f"Manually selected place: {loc}, \nPosted to Mastodon: {status['url']}", parse_mode=telegram.constants.ParseMode.MARKDOWN, - # reply_markup=MAIN_MENU ) - prompt_attach_comment_msg = await update.message.reply_text( - "You can continue adding comments, or press skip to finish", reply_markup=SKIP_MENU) - context.user_data["prompt_attach_comment_msg_id"] = prompt_attach_comment_msg.message_id + prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) + context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id - return COMMENT + return ADD_COMMENT -async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - print("process_callback") +async def callback_location_confirmation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query = update.callback_query await query.answer() - print(query.data) context.user_data["fsq_id"] = query.data + await query.delete_message() poi = get_loc(context.user_data["fsq_id"]) @@ -177,18 +160,16 @@ async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) - await query.message.reply_text( text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", parse_mode=telegram.constants.ParseMode.MARKDOWN, - # reply_markup=MAIN_MENU ) - prompt_attach_comment_msg = await query.message.reply_text( - "You can continue adding comments, or press skip to finish", reply_markup=SKIP_MENU) - context.user_data["prompt_attach_comment_msg_id"] = prompt_attach_comment_msg.message_id + prompt_attach_comment_msg = await query.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) + context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id - return COMMENT + return ADD_COMMENT -async def location_search_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - await context.bot.delete_message(update.effective_chat.id, context.user_data["prompt_msg_id"]) +async def callback_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_LOCATION_KEYWORD]) location_search = update.effective_message.text latitude = context.user_data["latitude"] @@ -199,26 +180,25 @@ async def location_search_callback(update: Update, context: ContextTypes.DEFAULT if len(poi_result) == 0: poi_result = query_poi("", latitude, longitude) - # for poi in poi_result: - # keyboard.append([ - # InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), - # ]) + for poi in poi_result: + keyboard.append([ + InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), + ]) if len(keyboard) == 0: - await update.message.reply_text("No nearby places found. You can input location name manually") - return WAIT_LOC + await update.message.reply_text(PROMPT_NO_NEARBY_POI) + return LOCATION_CONFIRMATION else: reply_markup = InlineKeyboardMarkup(keyboard) context.user_data["location_search"] = location_search - await update.message.reply_text("Where are you? ", reply_markup=reply_markup) + await update.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup) - return WAIT_LOC + return LOCATION_CONFIRMATION -async def skip_location_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: +async def callback_skip_location_keyword(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query = update.callback_query await query.answer() - print("skip: ", query.data) await query.message.delete() latitude = context.user_data["latitude"] @@ -232,13 +212,13 @@ async def skip_location_search(update: Update, context: ContextTypes.DEFAULT_TYP ]) reply_markup = InlineKeyboardMarkup(keyboard) - await query.message.reply_text("Where are you? ", reply_markup=reply_markup) + await query.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup) - return WAIT_LOC + return LOCATION_CONFIRMATION -async def comment_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - await context.bot.delete_message(update.effective_chat.id, context.user_data["prompt_attach_comment_msg_id"]) +async def callback_add_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT]) comment = update.effective_message.text mastodon_client.status_update( @@ -246,110 +226,39 @@ async def comment_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) - id=context.user_data["status_id"]) context.user_data["status_content"] = f"{comment} " + context.user_data["status_content"] - prompt_attach_photo_msg = await update.message.reply_text( - "You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU) - context.user_data["prompt_attach_photo_msg_id"] = prompt_attach_photo_msg.message_id - - return PHOTO - - -async def skip_comment_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - await context.bot.delete_message(update.effective_chat.id, context.user_data["prompt_attach_comment_msg_id"]) - prompt_attach_photo_msg = await update.message.reply_text( - "You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU) - context.user_data["prompt_attach_photo_msg_id"] = prompt_attach_photo_msg.message_id - return PHOTO - - -# async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: -# await update.message.reply_text("TOS", reply_markup=MAIN_MENU) - - -# async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: -# await update.message.reply_text("Setting", reply_markup=SETTING_MENU) -# return SETTING - - -# async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): -# await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU) -# return ConversationHandler.END - - -# async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE): -# await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) -# -# fsq_id = context.user_data["fsq_id"] -# poi = get_loc(context.user_data["fsq_id"]) -# media_id = [] -# -# if context.user_data.get("photo") is not None: -# media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg") -# media_id = [media["id"]] -# # else: -# # photo_url = get_poi_top_photo(context.user_data["fsq_id"]) -# # if photo_url is not None: -# # with urllib.request.urlopen(photo_url) as response: -# # data = response.read() -# # media = mastodon_client.media_post(data, mime_type="image/jpeg") -# # media_id = [media["id"]] -# -# mastodon_client.status_post( -# f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", -# visibility="private", -# media_ids=media_id) -# -# await update.message.delete() -# return ConversationHandler.END + prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU) + context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id + return ADD_MEDIA -async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Displays info on how to use the bot.""" - await update.message.reply_text("Use /start to test this bot.") +async def callback_skip_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT]) + prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU) + context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id + return ADD_MEDIA -# async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: -# """Cancels and ends the conversation.""" -# user = update.message.from_user -# logger.info("User %s canceled the conversation.", user.first_name) -# await update.message.reply_text( -# text="Setting canceled.", -# # "Bye! I hope we can talk again some day.", -# reply_markup=MAIN_MENU -# ) -# -# return ConversationHandler.END + +async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + await update.message.reply_text(PROMPT_HELP) async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - """Cancels and ends the conversation.""" - user = update.message.from_user - logger.info("User %s canceled the conversation.", user.first_name) - await update.message.reply_text( - text="Canceled.", - # "Bye! I hope we can talk again some day.", - reply_markup=MAIN_MENU - ) + await update.message.reply_text(text=PROMPT_CANCELED, reply_markup=MAIN_MENU) return ConversationHandler.END -class MsgDict(TypedDict): - media_id: str - caption: str - status_id: int - content: str - chat_id: int - - -async def media_group_sender(context: CallbackContext): +async def process_media_group(context: CallbackContext): context.job.data = cast(List[MsgDict], context.job.data) media_id = [] chat_id = context.job.data[0].get("chat_id") for msg_dict in context.job.data: if len(media_id) >= 4: - print("Cannot attach more than 4 photos") - break + await context.bot.send_message(chat_id=chat_id, text=PROMPT_MAX_PHOTO_REACHED, reply_markup=MAIN_MENU) + return + file = await context.bot.get_file(msg_dict.get("media_id")) img = io.BytesIO() await file.download_to_memory(img) @@ -364,20 +273,17 @@ async def media_group_sender(context: CallbackContext): id=msg_dict.get("status_id"), media_ids=media_id) - await context.bot.send_message(chat_id=chat_id, text="Done", - reply_markup=MAIN_MENU - ) + await context.bot.send_message(chat_id=chat_id, text=PROMPT_DONE, reply_markup=MAIN_MENU) -async def photo(update: Update, context: CallbackContext): - """Stores the photo and asks for a location.""" +async def callback_add_media(update: Update, context: CallbackContext): await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) try: - await context.bot.delete_message(chat_id=update.message.chat_id, message_id=context.user_data["prompt_attach_photo_msg_id"]) + await context.bot.delete_message(chat_id=update.message.chat_id, + message_id=context.user_data[PROMPT_ADD_MEDIA]) except telegram.error.BadRequest as e: if "not found" in str(e.message): - print("prompt_attach_photo_msg already deleted") pass status_id = context.user_data["status_id"] @@ -398,8 +304,8 @@ async def photo(update: Update, context: CallbackContext): if jobs: jobs[0].data.append(msg_dict) else: - context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict], - name=str(message.media_group_id)) + context.job_queue.run_once(callback=process_media_group, when=MEDIA_GROUP_TIMEOUT, + data=[msg_dict], name=str(message.media_group_id)) else: file = await update.message.effective_attachment[-1].get_file() img = io.BytesIO() @@ -412,19 +318,16 @@ async def photo(update: Update, context: CallbackContext): id=status_id, media_ids=media["id"]) - await update.message.reply_text(text="Done", - reply_markup=MAIN_MENU - ) + await update.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU) -async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: +async def callback_skip_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query = update.callback_query await query.answer() await query.delete_message() - await query.message.reply_text( - text="Done.", reply_markup=MAIN_MENU - ) + await query.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU) + return ConversationHandler.END @@ -434,35 +337,34 @@ def main() -> None: checkin_handler = ConversationHandler( entry_points=[ CommandHandler("start", start), - MessageHandler(filters.LOCATION, checkin), + MessageHandler(filters.LOCATION, callback_location_sharing), ], states={ - LOCATION: [ - MessageHandler(filters.LOCATION, checkin), + WAIT_LOCATION: [ + MessageHandler(filters.LOCATION, callback_location_sharing), ], - LOCATION_SEARCH: [ - MessageHandler(filters.TEXT, location_search_callback), - CallbackQueryHandler(skip_location_search), + LOCATION_SEARCH_KEYWORD: [ + MessageHandler(filters.TEXT, callback_location_keyword_search), + CallbackQueryHandler(callback_skip_location_keyword), ], - WAIT_LOC: [ - CallbackQueryHandler(process_callback), - MessageHandler(filters.TEXT, manual_location_process_callback) + LOCATION_CONFIRMATION: [ + CallbackQueryHandler(callback_location_confirmation), + MessageHandler(filters.TEXT, callback_manual_location) ], - COMMENT: [ - MessageHandler(filters.TEXT, comment_callback), - CallbackQueryHandler(skip_comment_callback), + ADD_COMMENT: [ + MessageHandler(filters.TEXT, callback_add_comment), + CallbackQueryHandler(callback_skip_comment), ], - PHOTO: [MessageHandler(filters.PHOTO, photo), - CallbackQueryHandler(skip_photo)], + ADD_MEDIA: [MessageHandler(filters.PHOTO, callback_add_media), + CallbackQueryHandler(callback_skip_media)], }, fallbacks=[CommandHandler("cancel", cancel)], per_message=False, allow_reentry=True, ) - application.add_handler(checkin_handler, 1) - - # Run the bot until the user presses Ctrl-C + application.add_handler(CommandHandler("help", help_command)) + application.add_handler(checkin_handler) application.run_polling() diff --git a/config.py b/config.py index 2469d0f..3356d11 100644 --- a/config.py +++ b/config.py @@ -12,3 +12,5 @@ TOOT_API_BASE_URL = config["TOOT"]["API_BASE_URL"] TOOT_CLIENT_ID = config["TOOT"]["CLIENT_ID"] TOOT_CLIENT_SECRET = config["TOOT"]["CLIENT_SECRET"] TOOT_ACCESS_TOKEN = config["TOOT"]["ACCESS_TOKEN"] + +MEDIA_GROUP_TIMEOUT = 3 diff --git a/prompt/string.py b/prompt/string.py new file mode 100644 index 0000000..af3363a --- /dev/null +++ b/prompt/string.py @@ -0,0 +1,12 @@ +PROMPT_CHOOSE_ACTION = "Use bot keyboard to choose an action" +PROMPT_ADD_COMMENT = "You can continue adding comments, or press skip" +PROMPT_ADD_MEDIA = "You can continue adding photos, or press skip" +PROMPT_LOCATION_KEYWORD = "You can input location search keywords or press skip" +PROMPT_NO_NEARBY_POI = "No nearby places found. You can input location name manually" +PROMPT_CHOOSE_POI_FROM_LIST = "Where are you?" +PROMPT_HELP = "Use /start to test this bot." +PROMPT_CANCELED = "Canceled" +PROMPT_MAX_PHOTO_REACHED = "Cannot attach more than 4 medias, only first 4 will be posted" +PROMPT_DONE = "Done" + +CALLBACK_SKIP = "Skip" -- cgit v1.2.3