From 25c5863df871629c74b3f8d678a0a8c9e786e227 Mon Sep 17 00:00:00 2001 From: clarkzjw Date: Mon, 20 Feb 2023 23:56:58 -0800 Subject: allow users to upload photos --- bot.py | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 196 insertions(+), 30 deletions(-) diff --git a/bot.py b/bot.py index 088eb0c..80d8e16 100644 --- a/bot.py +++ b/bot.py @@ -2,12 +2,8 @@ # pylint: disable=unused-argument, wrong-import-position # This program is dedicated to the public domain under the CC0 license. -""" -Basic example for a bot that uses inline keyboards. For an in-depth explanation, check out - https://github.com/python-telegram-bot/python-telegram-bot/wiki/InlineKeyboard-Example. -""" import logging - +import io import telegram.constants from telegram import __version__ as TG_VER @@ -22,14 +18,20 @@ if __version_info__ < (20, 0, 0, "alpha", 1): f"{TG_VER} version of this example, " f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" ) -from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update -from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters - +from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardRemove +from telegram.ext import Application, CallbackQueryHandler, \ + CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext, JobQueue from config import BOT_TOKEN -from foursquare.poi import query_poi, get_poi_top_photo +from foursquare.poi import query_poi from dbstore.dbm_store import get_loc from toot import mastodon_client -import urllib.request +from typing import TypedDict, List, cast +from telegram import Update, InputMediaVideo, InputMediaPhoto + +scheduler = None +PRIVACY, TOOT = map(chr, range(8, 10)) + +WAIT_LOC, LOCATION, PHOTO, PROCESS_PHOTO, FINAL = range(5) # Enable logging logging.basicConfig( @@ -47,6 +49,12 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: "Start using this bot by sharing your location using Telegram context menu to it." await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) + await update.message.reply_text("Please choose", + reply_markup=telegram.ReplyKeyboardMarkup([ + [telegram.KeyboardButton(text="Check in", request_location=True)], + [telegram.KeyboardButton(text="Setting")]])) + + return LOCATION async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: @@ -60,31 +68,85 @@ async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text("Select a place", reply_markup=reply_markup) + return WAIT_LOC -async def button(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Parses the CallbackQuery and updates the message text.""" - query = update.callback_query +async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + query = update.callback_query await query.answer() - # TODO - # ask user whether they would like to attach their own photos - - poi = get_loc(query.data) - photo_url = get_poi_top_photo(query.data) - media_id = None - if photo_url is not None: - with urllib.request.urlopen(photo_url) as response: - data = response.read() - media = mastodon_client.media_post(data, mime_type="image/jpeg") - media_id = [media["id"]] + print(query.data) + context.user_data["fsq_id"] = query.data + await query.delete_message() + poi = get_loc(context.user_data["fsq_id"]) + media_id = [] + content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" status = mastodon_client.status_post( + content, + visibility="private", + media_ids=media_id) + + context.user_data["status_id"] = status["id"] + context.user_data["status_content"] = content + + print("status_id", context.user_data["status_id"]) + + await query.message.reply_text( + text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", + parse_mode=telegram.constants.ParseMode.MARKDOWN, + reply_markup=telegram.ReplyKeyboardMarkup([ + [telegram.KeyboardButton(text="Check in", request_location=True)], + [telegram.KeyboardButton(text="Setting")]]) + ) + + await query.message.reply_text("You can continue attaching photos, or press skip to continue", + reply_markup=telegram.ReplyKeyboardMarkup([ + [telegram.KeyboardButton(text="/skip")]])) + return PHOTO + + +async def action(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if update.message.text == "Check in": + await update.message.reply_text("Please share your location", + reply_markup=telegram.ReplyKeyboardRemove()) + elif update.message.text == "Setting": + await update.message.reply_text("Setting") + + +async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + keyboard = [[ + InlineKeyboardButton("Privacy", callback_data=PRIVACY), + ]] + + reply_markup = InlineKeyboardMarkup(keyboard) + await update.message.reply_text("Setting", reply_markup=reply_markup) + + +async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE): + await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) + + fsq_id = context.user_data["fsq_id"] + poi = get_loc(context.user_data["fsq_id"]) + media_id = [] + + if context.user_data.get("photo") is not None: + media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg") + media_id = [media["id"]] + # else: + # photo_url = get_poi_top_photo(context.user_data["fsq_id"]) + # if photo_url is not None: + # with urllib.request.urlopen(photo_url) as response: + # data = response.read() + # media = mastodon_client.media_post(data, mime_type="image/jpeg") + # media_id = [media["id"]] + + mastodon_client.status_post( f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", visibility="private", media_ids=media_id) - await query.edit_message_text(text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", - parse_mode=telegram.constants.ParseMode.MARKDOWN) + await update.message.delete_message() + return ConversationHandler.END async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: @@ -92,13 +154,117 @@ async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No await update.message.reply_text("Use /start to test this bot.") +async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Cancels and ends the conversation.""" + user = update.message.from_user + logger.info("User %s canceled the conversation.", user.first_name) + await update.message.reply_text( + "Bye! I hope we can talk again some day.", reply_markup=ReplyKeyboardRemove() + ) + + return ConversationHandler.END + + +class MsgDict(TypedDict): + media_id: str + caption: str + status_id: int + content: str + chat_id: int + + +async def media_group_sender(context: CallbackContext): + context.job.data = cast(List[MsgDict], context.job.data) + + media_id = [] + chat_id = context.job.data[0].get("chat_id") + for msg_dict in context.job.data: + file = await context.bot.get_file(msg_dict.get("media_id")) + img = io.BytesIO() + await file.download_to_memory(img) + + img.seek(0) + + media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") + media_id.append(media["id"]) + + mastodon_client.status_update( + status=msg_dict.get("content"), + id=msg_dict.get("status_id"), + media_ids=media_id) + + await context.bot.send_message(chat_id=chat_id, text="Done", + reply_markup=telegram.ReplyKeyboardMarkup([ + [telegram.KeyboardButton(text="Check in", request_location=True)], + [telegram.KeyboardButton(text="Setting")]])) + + +async def photo(update: Update, context: CallbackContext): + """Stores the photo and asks for a location.""" + global scheduler + await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) + + status_id = context.user_data["status_id"] + status_content = context.user_data["status_content"] + + message = update.effective_message + context.user_data["media"] = [] + if message.media_group_id: + media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id + msg_dict = { + "media_id": media_id, + "caption": message.caption_html, + "status_id": status_id, + "content": status_content, + "chat_id": message.chat_id, + } + global jobname + jobname = str(message.media_group_id) + jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) + if jobs: + jobs[0].data.append(msg_dict) + else: + context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict], + name=str(message.media_group_id)) + else: + file = await update.message.effective_attachment[-1].get_file() + img = io.BytesIO() + await file.download_to_memory(img) + img.seek(0) + context.user_data["photo"].append(img.read()) + + await process_location(update, context) + + +async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): + print(context.user_data) + reply_markup = telegram.ReplyKeyboardRemove() + await update.message.reply_text( + text="Done.", reply_markup=reply_markup + ) + return ConversationHandler.END + + def main() -> None: application = Application.builder().token(BOT_TOKEN).build() - application.add_handler(CommandHandler("start", start)) - application.add_handler(CallbackQueryHandler(button)) - application.add_handler(MessageHandler(filters.LOCATION & ~filters.COMMAND, checkin)) - application.add_handler(CommandHandler("help", help_command)) + conv_handler = ConversationHandler( + entry_points=[ + CommandHandler("start", start), + ], + states={ + LOCATION: [ + MessageHandler(filters.LOCATION, checkin), + ], + WAIT_LOC: [CallbackQueryHandler(process_callback)], + PHOTO: [MessageHandler(filters.PHOTO, photo), + CommandHandler("skip", skip_photo)], + }, + fallbacks=[CommandHandler("cancel", cancel)], + per_message=False, + ) + + application.add_handler(conv_handler) # Run the bot until the user presses Ctrl-C application.run_polling() -- cgit v1.2.3