#!/usr/bin/env python import telegram.constants from telegram import __version__ as TG_VER try: from telegram import __version_info__ except ImportError: __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment] if __version_info__ < (20, 0, 0, "alpha", 1): raise RuntimeError( f"This example is not compatible with your current PTB version {TG_VER}. To view the " f"{TG_VER} version of this example, " f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" ) import logging from pprint import pprint import io from foursquare.poi import OSM_ENDPOINT from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, \ ConversationHandler, CallbackContext from config import BOT_TOKEN, MEDIA_GROUP_TIMEOUT from foursquare.poi import query_poi from dbstore.dbm_store import get_loc from toot import mastodon_client from typing import TypedDict, List, cast from prompt.string import * logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO ) logger = logging.getLogger(__name__) WAIT_LOCATION, LOCATION_SEARCH_KEYWORD, LOCATION_CONFIRMATION, ADD_MEDIA, ADD_COMMENT = range(5) MAIN_MENU = ReplyKeyboardMarkup([ [KeyboardButton(text="Check-in here", request_location=True)], ]) SKIP_LOCATION_SEARCH = CALLBACK_SKIP INLINE_SKIP_MENU = InlineKeyboardMarkup([ [telegram.InlineKeyboardButton("Skip", callback_data=SKIP_LOCATION_SEARCH)] ]) class MsgDict(TypedDict): media_id: str caption: str status_id: int content: str chat_id: int async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: hello = "Hello, this is `checkin.bot`. \n\n" \ "This is a Telegram bot with functionality similar to Foursquare Swarm, " \ "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \ "Aware of privacy concerns, this bot will not store your location data." \ "*Be safe and cautious when sharing your real time location on the web.* \n\n" \ "Start using this bot by sharing your location using Telegram context menu to it." await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) await update.message.reply_text(PROMPT_CHOOSE_ACTION, reply_markup=MAIN_MENU) return WAIT_LOCATION async def callback_location_sharing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: if update.message.venue is not None: fsq_id = update.message.venue.foursquare_id title = update.message.venue.title context.user_data["fsq_id"] = fsq_id context.user_data["title"] = title context.user_data["latitude"] = update.message.venue.location.latitude context.user_data["longitude"] = update.message.venue.location.longitude poi = get_loc(context.user_data["fsq_id"]) media_id = [] content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" status = mastodon_client.status_post( content, visibility="private", media_ids=media_id) context.user_data["status_id"] = status["id"] context.user_data["status_content"] = content print("status_id", context.user_data["status_id"]) await update.message.reply_text( text=f"Selected place: {poi['name']}, \nPosted to Mastodon: {status['url']}", parse_mode=telegram.constants.ParseMode.MARKDOWN, ) prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id return ADD_COMMENT else: context.user_data["latitude"] = update.message.location.latitude context.user_data["longitude"] = update.message.location.longitude await update.message.reply_text("Searching...", reply_markup=telegram.ReplyKeyboardRemove()) prompt_msg = await update.message.reply_text(PROMPT_LOCATION_KEYWORD, reply_markup=INLINE_SKIP_MENU) context.user_data[PROMPT_LOCATION_KEYWORD] = prompt_msg.message_id return LOCATION_SEARCH_KEYWORD async def callback_manual_location(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: loc = update.effective_message.text osm_url = OSM_ENDPOINT.format(context.user_data["latitude"], context.user_data["longitude"]) media_id = [] content = f"I'm at {loc}, {osm_url}" status = mastodon_client.status_post( content, visibility="private", media_ids=media_id) context.user_data["status_id"] = status["id"] context.user_data["status_content"] = content print("status_id", context.user_data["status_id"]) await update.message.reply_text( text=f"Manually selected place: {loc}, \nPosted to Mastodon: {status['url']}", parse_mode=telegram.constants.ParseMode.MARKDOWN, ) prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id return ADD_COMMENT async def callback_location_confirmation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query = update.callback_query await query.answer() context.user_data["fsq_id"] = query.data await query.delete_message() poi = get_loc(context.user_data["fsq_id"]) media_id = [] content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" status = mastodon_client.status_post( content, visibility="private", media_ids=media_id) context.user_data["status_id"] = status["id"] context.user_data["status_content"] = content print("status_id", context.user_data["status_id"]) await query.message.reply_text( text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", parse_mode=telegram.constants.ParseMode.MARKDOWN, ) prompt_attach_comment_msg = await query.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id return ADD_COMMENT async def callback_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_LOCATION_KEYWORD]) location_search = update.effective_message.text latitude = context.user_data["latitude"] longitude = context.user_data["longitude"] keyboard = [] poi_result = query_poi(location_search, latitude, longitude) if len(poi_result) == 0: poi_result = query_poi("", latitude, longitude) for poi in poi_result: keyboard.append([ InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), ]) if len(keyboard) == 0: await update.message.reply_text(PROMPT_NO_NEARBY_POI) return LOCATION_CONFIRMATION else: reply_markup = InlineKeyboardMarkup(keyboard) context.user_data["location_search"] = location_search await update.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup) return LOCATION_CONFIRMATION async def callback_skip_location_keyword(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query = update.callback_query await query.answer() await query.message.delete() latitude = context.user_data["latitude"] longitude = context.user_data["longitude"] keyboard = [] for poi in query_poi("", latitude, longitude): keyboard.append([ InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), ]) reply_markup = InlineKeyboardMarkup(keyboard) await query.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup) return LOCATION_CONFIRMATION async def callback_add_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT]) comment = update.effective_message.text mastodon_client.status_update( status=f"{comment} " + context.user_data["status_content"], id=context.user_data["status_id"]) context.user_data["status_content"] = f"{comment} " + context.user_data["status_content"] prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU) context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id return ADD_MEDIA async def callback_skip_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT]) prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU) context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id return ADD_MEDIA async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: await update.message.reply_text(PROMPT_HELP) async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: await update.message.reply_text(text=PROMPT_CANCELED, reply_markup=MAIN_MENU) return ConversationHandler.END async def process_media_group(context: CallbackContext): context.job.data = cast(List[MsgDict], context.job.data) media_id = [] chat_id = context.job.data[0].get("chat_id") for msg_dict in context.job.data: if len(media_id) >= 4: await context.bot.send_message(chat_id=chat_id, text=PROMPT_MAX_PHOTO_REACHED, reply_markup=MAIN_MENU) return file = await context.bot.get_file(msg_dict.get("media_id")) img = io.BytesIO() await file.download_to_memory(img) img.seek(0) media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") media_id.append(media["id"]) mastodon_client.status_update( status=msg_dict.get("content"), id=msg_dict.get("status_id"), media_ids=media_id) await context.bot.send_message(chat_id=chat_id, text=PROMPT_DONE, reply_markup=MAIN_MENU) async def callback_add_media(update: Update, context: CallbackContext): await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) try: await context.bot.delete_message(chat_id=update.message.chat_id, message_id=context.user_data[PROMPT_ADD_MEDIA]) except telegram.error.BadRequest as e: if "not found" in str(e.message): pass status_id = context.user_data["status_id"] status_content = context.user_data["status_content"] message = update.effective_message context.user_data["media"] = [] if message.media_group_id: media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id msg_dict = { "media_id": media_id, "caption": message.caption_html, "status_id": status_id, "content": status_content, "chat_id": message.chat_id, } jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) if jobs: jobs[0].data.append(msg_dict) else: context.job_queue.run_once(callback=process_media_group, when=MEDIA_GROUP_TIMEOUT, data=[msg_dict], name=str(message.media_group_id)) else: file = await update.message.effective_attachment[-1].get_file() img = io.BytesIO() await file.download_to_memory(img) img.seek(0) media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") mastodon_client.status_update( status=status_content, id=status_id, media_ids=media["id"]) await update.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU) async def callback_skip_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query = update.callback_query await query.answer() await query.delete_message() await query.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU) return ConversationHandler.END def main() -> None: application = Application.builder().token(BOT_TOKEN).build() checkin_handler = ConversationHandler( entry_points=[ CommandHandler("start", start), MessageHandler(filters.LOCATION, callback_location_sharing), ], states={ WAIT_LOCATION: [ MessageHandler(filters.LOCATION, callback_location_sharing), ], LOCATION_SEARCH_KEYWORD: [ MessageHandler(filters.TEXT, callback_location_keyword_search), CallbackQueryHandler(callback_skip_location_keyword), ], LOCATION_CONFIRMATION: [ CallbackQueryHandler(callback_location_confirmation), MessageHandler(filters.TEXT, callback_manual_location) ], ADD_COMMENT: [ MessageHandler(filters.TEXT, callback_add_comment), CallbackQueryHandler(callback_skip_comment), ], ADD_MEDIA: [MessageHandler(filters.PHOTO, callback_add_media), CallbackQueryHandler(callback_skip_media)], }, fallbacks=[CommandHandler("cancel", cancel)], per_message=False, allow_reentry=True, ) application.add_handler(CommandHandler("help", help_command)) application.add_handler(checkin_handler) application.run_polling() if __name__ == "__main__": main()