#!/usr/bin/env python import asyncio import logging from dataclasses import dataclass from http import HTTPStatus from config import BOT_TOKEN, TELEGRAM_WEBHOOK_URL, HEALTHCHECK_URL, FEDI_LOGIN_CALLBACK_URL, BOT_DOMAIN, BOT_PORT, ENCRYPT_KEY import uvicorn from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import PlainTextResponse, Response, JSONResponse from starlette.routing import Route from telegram import Update from telegram.ext import ( Application, CallbackContext, ContextTypes, ExtBot, CallbackQueryHandler, CommandHandler, MessageHandler, filters, ConversationHandler, TypeHandler, ) from callback import ( callback_generate_fedi_login_url, callback_skip_media, callback_location_sharing, callback_manual_location, callback_location_confirmation, callback_location_keyword_search, callback_skip_location_keyword_search, callback_add_comment, callback_skip_comment, callback_add_media ) from command import ( start_command, fedi_login_command, cancel_command, help_command, tos_command, toggle_visibility_command, callback_toggle_visibility, logout_command, list_command ) from config import ( FEDI_LOGIN, WAIT_LOCATION, PROMPT_FEDI_LOGIN, LOCATION_SEARCH_KEYWORD, LOCATION_CONFIRMATION, ADD_MEDIA, ADD_COMMENT, BOT_TOKEN, BOT_SCOPE, MAIN_MENU, WAIT_VISIBILITY ) from prompt.string import PROMPT_CHOOSE_ACTION from mastodon import Mastodon from dbstore.peewee_store import db, User, get_user_by_state from util import encrypt, decrypt logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO ) logger = logging.getLogger(__name__) @dataclass class FediLoginCallbackUpdate: code: str state: str user_id: str class FediLoginCallbackContext(CallbackContext[ExtBot, dict, dict, dict]): """ Custom CallbackContext class that makes `user_data` available for updates of type `WebhookUpdate`. """ @classmethod def from_update(cls, update: object, application: "Application") -> "FediLoginCallbackContext": if isinstance(update, FediLoginCallbackUpdate): return cls(application=application, user_id=int(update.user_id)) return super().from_update(update, application) async def process_oauth_login_callback(update: FediLoginCallbackUpdate, context: FediLoginCallbackContext) -> None: state = update.state with db.connection_context(): user = User.get(User.state == state) client_id = user.client_id client_secret = user.client_secret home_instance = user.home_instance if len(user.access_key) == 0: mastodon_client = Mastodon(client_id=client_id, client_secret=client_secret, api_base_url=home_instance, version_check_mode="none") access_token = mastodon_client.log_in( code=update.code, redirect_uri="{}{}".format(BOT_DOMAIN, FEDI_LOGIN_CALLBACK_URL), scopes=BOT_SCOPE ) instance_info = mastodon_client.instance_nodeinfo() if instance_info["software"]["name"] == "pleroma": user.home_instance_type = "pleroma" user.access_key = encrypt(access_token, ENCRYPT_KEY) user.save() text = "You have successfully logged in to your Mastodon account!" await context.bot.delete_message(chat_id=user.telegram_user_id, message_id=context.user_data[PROMPT_FEDI_LOGIN]) await context.bot.send_message(chat_id=user.telegram_user_id, text=text) await context.bot.send_message(chat_id=user.telegram_user_id, text=PROMPT_CHOOSE_ACTION, reply_markup=MAIN_MENU) async def main() -> None: context_types = ContextTypes(context=FediLoginCallbackContext) application = ( Application.builder().updater(None).token(BOT_TOKEN).context_types(context_types).build() ) # TODO: # check user login status before invoking commands checkin_handler = ConversationHandler( entry_points=[ CommandHandler("start", start_command), CommandHandler("login", fedi_login_command), MessageHandler(filters.LOCATION, callback_location_sharing), ], states={ FEDI_LOGIN: [ MessageHandler(filters.TEXT & ~filters.COMMAND, callback_generate_fedi_login_url), ], WAIT_LOCATION: [ MessageHandler(filters.LOCATION, callback_location_sharing), ], LOCATION_SEARCH_KEYWORD: [ MessageHandler(filters.TEXT & ~filters.COMMAND, callback_location_keyword_search), CallbackQueryHandler(callback_skip_location_keyword_search), ], LOCATION_CONFIRMATION: [ CallbackQueryHandler(callback_location_confirmation), MessageHandler(filters.TEXT & ~filters.COMMAND, callback_manual_location) ], ADD_COMMENT: [ MessageHandler(filters.TEXT & ~filters.COMMAND, callback_add_comment), CallbackQueryHandler(callback_skip_comment), ], ADD_MEDIA: [MessageHandler(filters.PHOTO, callback_add_media), CallbackQueryHandler(callback_skip_media)], }, fallbacks=[CommandHandler("cancel", cancel_command)], per_message=False, allow_reentry=True, ) # register handlers application.add_handler(CommandHandler("tos", tos_command)) visibility_conversation_handler = ConversationHandler( entry_points=[ CommandHandler("vis", toggle_visibility_command) ], states={ WAIT_VISIBILITY: [ CallbackQueryHandler(callback_toggle_visibility) ]}, fallbacks=[CommandHandler("cancel", cancel_command)], per_message=False, allow_reentry=True, ) application.add_handler(CommandHandler("logout", logout_command)) application.add_handler(CommandHandler("list", list_command)) application.add_handler(CommandHandler("Help", help_command)) application.add_handler(TypeHandler(type=FediLoginCallbackUpdate, callback=process_oauth_login_callback)) application.add_handler(visibility_conversation_handler, 2) application.add_handler(checkin_handler, 1) # Pass webhook settings to telegram await application.bot.set_webhook(url=f"{BOT_DOMAIN}{TELEGRAM_WEBHOOK_URL}") # Set up webserver async def telegram_webhook(request: Request) -> JSONResponse: """Handle incoming Telegram updates by putting them into the `update_queue`""" await application.update_queue.put( Update.de_json(data=await request.json(), bot=application.bot) ) return JSONResponse({'OK': 200}) async def fedi_oauth_login_callback(request: Request) -> PlainTextResponse: """ Handle incoming webhook updates by also putting them into the `update_queue` if the required parameters were passed correctly. """ try: code = request.query_params["code"] state = request.query_params.get("state") user = get_user_by_state(state) except KeyError: return PlainTextResponse( status_code=HTTPStatus.BAD_REQUEST, content="Mastodon callback request doesn't contain a valid OAuth code", ) await application.update_queue.put(FediLoginCallbackUpdate(state=state, code=code, user_id=user["telegram_user_id"])) return PlainTextResponse("Thank you for login! Now you can close the browser") async def healthcheck(_: Request) -> PlainTextResponse: return PlainTextResponse(content="OK") starlette_app = Starlette( routes=[ Route(TELEGRAM_WEBHOOK_URL, telegram_webhook, methods=["POST"]), Route(HEALTHCHECK_URL, healthcheck, methods=["GET"]), Route(FEDI_LOGIN_CALLBACK_URL, fedi_oauth_login_callback, methods=["GET"]), ] ) webserver = uvicorn.Server( config=uvicorn.Config( app=starlette_app, port=BOT_PORT, use_colors=False, host="0.0.0.0", ) ) # Run application and webserver together async with application: await application.start() await webserver.serve() await application.stop() if __name__ == "__main__": asyncio.run(main())