#!/usr/bin/env python # pylint: disable=unused-argument, wrong-import-position # This program is dedicated to the public domain under the CC0 license. import logging import io import telegram.constants from telegram import __version__ as TG_VER try: from telegram import __version_info__ except ImportError: __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment] if __version_info__ < (20, 0, 0, "alpha", 1): raise RuntimeError( f"This example is not compatible with your current PTB version {TG_VER}. To view the " f"{TG_VER} version of this example, " f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" ) from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardRemove from telegram.ext import Application, CallbackQueryHandler, \ CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext, JobQueue from config import BOT_TOKEN from foursquare.poi import query_poi from dbstore.dbm_store import get_loc from toot import mastodon_client from typing import TypedDict, List, cast from telegram import Update, InputMediaVideo, InputMediaPhoto scheduler = None PRIVACY, TOOT = map(chr, range(8, 10)) WAIT_LOC, LOCATION, PHOTO, PROCESS_PHOTO, FINAL = range(5) # Enable logging logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO ) logger = logging.getLogger(__name__) async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: hello = "Hello, this is `checkin.bot`. \n\n" \ "This is a Telegram bot with functionality similar to Foursquare Swarm, " \ "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \ "Aware of privacy concerns, this bot will not store your location data." \ "*Be safe and cautious when sharing your real time location on the web.* \n\n" \ "Start using this bot by sharing your location using Telegram context menu to it." await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) await update.message.reply_text("Please choose", reply_markup=telegram.ReplyKeyboardMarkup([ [telegram.KeyboardButton(text="Check in", request_location=True)], [telegram.KeyboardButton(text="Setting")]])) return LOCATION async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: keyboard = [] for poi in query_poi(update.message.location.latitude, update.message.location.longitude): keyboard.append([ InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), ]) reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text("Select a place", reply_markup=reply_markup) return WAIT_LOC async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: query = update.callback_query await query.answer() print(query.data) context.user_data["fsq_id"] = query.data await query.delete_message() poi = get_loc(context.user_data["fsq_id"]) media_id = [] content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" status = mastodon_client.status_post( content, visibility="private", media_ids=media_id) context.user_data["status_id"] = status["id"] context.user_data["status_content"] = content print("status_id", context.user_data["status_id"]) await query.message.reply_text( text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", parse_mode=telegram.constants.ParseMode.MARKDOWN, reply_markup=telegram.ReplyKeyboardMarkup([ [telegram.KeyboardButton(text="Check in", request_location=True)], [telegram.KeyboardButton(text="Setting")]]) ) await query.message.reply_text("You can continue attaching photos, or press skip to continue", reply_markup=telegram.ReplyKeyboardMarkup([ [telegram.KeyboardButton(text="/skip")]])) return PHOTO async def action(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: if update.message.text == "Check in": await update.message.reply_text("Please share your location", reply_markup=telegram.ReplyKeyboardRemove()) elif update.message.text == "Setting": await update.message.reply_text("Setting") async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: keyboard = [[ InlineKeyboardButton("Privacy", callback_data=PRIVACY), ]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text("Setting", reply_markup=reply_markup) async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) fsq_id = context.user_data["fsq_id"] poi = get_loc(context.user_data["fsq_id"]) media_id = [] if context.user_data.get("photo") is not None: media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg") media_id = [media["id"]] # else: # photo_url = get_poi_top_photo(context.user_data["fsq_id"]) # if photo_url is not None: # with urllib.request.urlopen(photo_url) as response: # data = response.read() # media = mastodon_client.media_post(data, mime_type="image/jpeg") # media_id = [media["id"]] mastodon_client.status_post( f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", visibility="private", media_ids=media_id) await update.message.delete_message() return ConversationHandler.END async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Displays info on how to use the bot.""" await update.message.reply_text("Use /start to test this bot.") async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: """Cancels and ends the conversation.""" user = update.message.from_user logger.info("User %s canceled the conversation.", user.first_name) await update.message.reply_text( "Bye! I hope we can talk again some day.", reply_markup=ReplyKeyboardRemove() ) return ConversationHandler.END class MsgDict(TypedDict): media_id: str caption: str status_id: int content: str chat_id: int async def media_group_sender(context: CallbackContext): context.job.data = cast(List[MsgDict], context.job.data) media_id = [] chat_id = context.job.data[0].get("chat_id") for msg_dict in context.job.data: file = await context.bot.get_file(msg_dict.get("media_id")) img = io.BytesIO() await file.download_to_memory(img) img.seek(0) media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") media_id.append(media["id"]) mastodon_client.status_update( status=msg_dict.get("content"), id=msg_dict.get("status_id"), media_ids=media_id) await context.bot.send_message(chat_id=chat_id, text="Done", reply_markup=telegram.ReplyKeyboardMarkup([ [telegram.KeyboardButton(text="Check in", request_location=True)], [telegram.KeyboardButton(text="Setting")]])) async def photo(update: Update, context: CallbackContext): """Stores the photo and asks for a location.""" global scheduler await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) status_id = context.user_data["status_id"] status_content = context.user_data["status_content"] message = update.effective_message context.user_data["media"] = [] if message.media_group_id: media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id msg_dict = { "media_id": media_id, "caption": message.caption_html, "status_id": status_id, "content": status_content, "chat_id": message.chat_id, } global jobname jobname = str(message.media_group_id) jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) if jobs: jobs[0].data.append(msg_dict) else: # TODO # media_group_sender won't end the callback context # should add a job event listener context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict], name=str(message.media_group_id)) else: file = await update.message.effective_attachment[-1].get_file() img = io.BytesIO() await file.download_to_memory(img) img.seek(0) context.user_data["photo"].append(img.read()) await process_location(update, context) async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): print(context.user_data) reply_markup = telegram.ReplyKeyboardRemove() await update.message.reply_text( text="Done.", reply_markup=reply_markup ) return ConversationHandler.END def main() -> None: application = Application.builder().token(BOT_TOKEN).build() conv_handler = ConversationHandler( entry_points=[ CommandHandler("start", start), ], states={ LOCATION: [ MessageHandler(filters.LOCATION, checkin), ], WAIT_LOC: [CallbackQueryHandler(process_callback)], PHOTO: [MessageHandler(filters.PHOTO, photo), CommandHandler("skip", skip_photo)], }, fallbacks=[CommandHandler("cancel", cancel)], per_message=False, ) application.add_handler(conv_handler) # Run the bot until the user presses Ctrl-C application.run_polling() if __name__ == "__main__": main()