import io from typing import cast, List from telegram import ReplyKeyboardRemove from telegram.constants import ChatAction from telegram.error import BadRequest from telegram.ext import CallbackContext from command import * from dbstore.peewee_store import get_poi_by_fsq_id from foursquare.poi import OSM_ENDPOINT from foursquare.poi import query_poi from config import BOT_SCOPE from dbstore.peewee_store import User, db import uuid def generate_uuid(): return str(uuid.uuid4()) def get_mastodon_client(user_id: int): with db.connection_context(): user = User.get(User.telegram_user_id == user_id) if user.home_instance and user.access_key: return Mastodon(access_token=user.access_key, api_base_url=user.home_instance) def generate_toot_text(poi_name, poi_locality, poi_region, poi_lat, poi_lon): osm_url = OSM_ENDPOINT.format(poi_lat, poi_lon) location = "" if poi_locality: location = poi_locality if poi_region: location += ", " + poi_region if location: return f"I'm at {poi_name} in {location}, {osm_url}" else: return f"I'm at {poi_name}, {osm_url}" async def get_img_file_bytes(telegram_media_file): img = io.BytesIO() await telegram_media_file.download_to_memory(img) img.seek(0) return img.read() async def process_media_group(context: CallbackContext): context.job.data = cast(List[MsgDict], context.job.data) mastodon_client = get_mastodon_client(context.user_data["user_id"]) media_id = [] chat_id = context.job.data[0].get("chat_id") for media_dict in context.job.data: if len(media_id) >= 4: await context.bot.send_message(chat_id=chat_id, text=PROMPT_MAX_PHOTO_REACHED, reply_markup=MAIN_MENU) return file = await context.bot.get_file(media_dict.get("media_id")) img = await get_img_file_bytes(file) media = mastodon_client.media_post(img, description=media_dict.get("caption"), mime_type="image/jpeg") media_id.append(media["id"]) mastodon_client.status_update(id=media_dict.get("status_id"), status=media_dict.get("content"), media_ids=media_id) await context.bot.send_message(chat_id=chat_id, text=PROMPT_DONE, reply_markup=MAIN_MENU) async def callback_generate_fedi_login_url(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: home_instance = update.effective_message.text client_id, client_secret = Mastodon.create_app( "Checkin.bot", scopes=BOT_SCOPE, redirect_uris="{}{}".format(BOT_DOMAIN, FEDI_LOGIN_CALLBACK_URL), api_base_url=home_instance, ) print("client_id: {}".format(client_id)) print("client_secret: {}".format(client_secret)) m = Mastodon(client_id=client_id, client_secret=client_secret, api_base_url=home_instance) user_id = update.effective_user.id state = generate_uuid() db.connect() u = User.get_or_none(telegram_user_id=user_id) if u is None: u = User.create(telegram_user_id=user_id, access_key="", home_instance=home_instance, client_id=client_id, client_secret=client_secret, state=state) u.save() db.close() oauth_url = m.auth_request_url(redirect_uris="{}{}".format(BOT_DOMAIN, FEDI_LOGIN_CALLBACK_URL), scopes=BOT_SCOPE, state=state) await update.message.reply_text(PROMPT_FEDI_LOGIN, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Login", url=oauth_url)]]), parse_mode=ParseMode.MARKDOWN) return FEDI_LOGIN async def callback_location_sharing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: if update.message.venue is not None: context.user_data["fsq_id"] = update.message.venue.foursquare_id context.user_data["title"] = update.message.venue.title context.user_data["latitude"] = update.message.venue.location.latitude context.user_data["longitude"] = update.message.venue.location.longitude poi = get_poi_by_fsq_id(context.user_data.get("fsq_id")) content = generate_toot_text(poi["name"], poi["locality"], poi["region"], poi["latitude"], poi["longitude"]) status = get_mastodon_client(update.effective_user.id).status_post(content, visibility=DEFAULT_TOOT_VISIBILITY, media_ids=[]) context.user_data[KEY_TOOT_STATUS_ID] = status["id"] context.user_data[KEY_TOOT_STATUS_CONTENT] = content await update.message.reply_text(text=f"Selected place: {poi['name']}, \nPosted to Mastodon: {status['url']}", parse_mode=ParseMode.MARKDOWN) msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) context.user_data[PROMPT_ADD_COMMENT] = msg.message_id return ADD_COMMENT else: context.user_data["latitude"] = update.message.location.latitude context.user_data["longitude"] = update.message.location.longitude await update.message.reply_text("Searching...", reply_markup=ReplyKeyboardRemove()) msg = await update.message.reply_text(PROMPT_LOCATION_KEYWORD, reply_markup=INLINE_SKIP_MENU) context.user_data[PROMPT_LOCATION_KEYWORD] = msg.message_id return LOCATION_SEARCH_KEYWORD async def _process_location_search(keyword, lat, lon) -> list: keyboard = [] poi_result = [] if keyword: poi_result = query_poi(keyword, lat, lon) if len(poi_result) == 0: poi_result = query_poi("", lat, lon) for poi in poi_result: keyboard.append([ InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), ]) return keyboard async def callback_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: await context.bot.delete_message(update.effective_chat.id, context.user_data.get(PROMPT_LOCATION_KEYWORD)) key = update.effective_message.text keyboard = await _process_location_search(key, context.user_data.get("latitude"), context.user_data.get("longitude")) if len(keyboard) == 0: msg = await update.message.reply_text(PROMPT_WAIT_LOCATION_CONFIRMATION_NO_NEARBY_POI) else: msg = await update.message.reply_text(PROMPT_WAIT_LOCATION_CONFIRMATION, reply_markup=InlineKeyboardMarkup(keyboard)) context.user_data[PROMPT_WAIT_LOCATION_CONFIRMATION] = msg.message_id return LOCATION_CONFIRMATION async def callback_skip_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query = update.callback_query await query.answer() await query.message.delete() keyboard = await _process_location_search("", context.user_data.get("latitude"), context.user_data.get("longitude")) await query.message.reply_text(PROMPT_WAIT_LOCATION_CONFIRMATION, reply_markup=InlineKeyboardMarkup(keyboard)) return LOCATION_CONFIRMATION async def _process_location_selection(context: ContextTypes.DEFAULT_TYPE) -> int: poi_name = context.user_data.get("poi_name") if context.user_data.get("fsq_id") is not None: poi = get_poi_by_fsq_id(context.user_data.get("fsq_id")) content = generate_toot_text(poi["name"], poi["locality"], poi["region"], poi["latitude"], poi["longitude"]) else: content = generate_toot_text(poi_name, "", "", context.user_data.get("latitude"), context.user_data.get("longitude")) status = get_mastodon_client(context.user_data["user_id"]).status_post(content, visibility=DEFAULT_TOOT_VISIBILITY, media_ids=[]) context.user_data[KEY_TOOT_STATUS_ID] = status["id"] context.user_data[KEY_TOOT_STATUS_CONTENT] = content await context.bot.send_message(chat_id=context.user_data.get("chat_id"), text=f"Selected place: {poi_name}, \nPosted to Mastodon: {status['url']}", parse_mode=ParseMode.MARKDOWN) msg = await context.bot.send_message(chat_id=context.user_data.get("chat_id"), text=PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU) context.user_data[PROMPT_ADD_COMMENT] = msg.message_id return ADD_COMMENT async def callback_location_confirmation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query = update.callback_query await query.answer() context.user_data["fsq_id"] = query.data context.user_data["user_id"] = update.effective_user.id await query.delete_message() context.user_data["chat_id"] = update.effective_chat.id return await _process_location_selection(context) async def callback_manual_location(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: context.user_data["poi_name"] = update.effective_message.text context.user_data["chat_id"] = update.effective_chat.id context.user_data["user_id"] = update.effective_user.id return await _process_location_selection(context) async def _process_comment(context: ContextTypes.DEFAULT_TYPE) -> int: msg = await context.bot.send_message(chat_id=context.user_data.get("chat_id"), text=PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU) context.user_data[PROMPT_ADD_MEDIA] = msg.message_id return ADD_MEDIA async def callback_add_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: context.user_data["chat_id"] = update.effective_chat.id await context.bot.delete_message(update.effective_chat.id, context.user_data.get(PROMPT_ADD_COMMENT)) comment = update.effective_message.text get_mastodon_client(update.effective_user.id).status_update(id=context.user_data.get(KEY_TOOT_STATUS_ID), status=f"{comment} " + context.user_data.get(KEY_TOOT_STATUS_CONTENT)) context.user_data[KEY_TOOT_STATUS_CONTENT] = f"{comment} " + context.user_data.get(KEY_TOOT_STATUS_CONTENT) return await _process_comment(context) async def callback_skip_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: context.user_data["chat_id"] = update.effective_chat.id await context.bot.delete_message(update.effective_chat.id, context.user_data.get(PROMPT_ADD_COMMENT)) return await _process_comment(context) async def callback_add_media(update: Update, context: CallbackContext): await update.message.reply_chat_action(ChatAction.TYPING) try: await context.bot.delete_message(chat_id=update.message.chat_id, message_id=context.user_data.get(PROMPT_ADD_MEDIA)) except BadRequest as e: if "not found" in str(e.message): pass mastodon_client = get_mastodon_client(update.effective_user.id) status_id = context.user_data.get(KEY_TOOT_STATUS_ID) status_content = context.user_data.get(KEY_TOOT_STATUS_CONTENT) message = update.effective_message context.user_data["media"] = [] if message.media_group_id: media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id group_media = { "media_id": media_id, "caption": message.caption_html, "status_id": status_id, "content": status_content, "chat_id": message.chat_id, } jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) if jobs: jobs[0].data.append(group_media) else: context.job_queue.run_once(callback=process_media_group, when=MEDIA_GROUP_TIMEOUT, data=[group_media], name=str(message.media_group_id)) else: file = await update.message.effective_attachment[-1].get_file() img = await get_img_file_bytes(file) media = mastodon_client.media_post(img, description=message.caption_html, mime_type="image/jpeg") mastodon_client.status_update(status=status_content, id=status_id, media_ids=media["id"]) await update.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU) async def callback_skip_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query = update.callback_query await query.answer() await query.delete_message() await query.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU) return ConversationHandler.END