#!/usr/bin/env python
# This program is dedicated to the public domain under the CC0 license.
# pylint: disable=import-error,wrong-import-position
"""
Simple example of a bot that uses a custom webhook setup and handles custom updates.
For the custom webhook setup, the libraries `starlette` and `uvicorn` are used. Please install
them as `pip install starlette~=0.20.0 uvicorn~=0.17.0`.
Note that any other `asyncio` based web server framework can be used for a custom webhook setup
just as well.
Usage:
Set bot token, url, admin chat_id and port at the start of the `main` function.
You may also need to change the `listen` value in the uvicorn configuration to match your setup.
Press Ctrl-C on the command line or send a signal to the process to stop the bot.
"""
import asyncio
import html
import logging
from dataclasses import dataclass
from http import HTTPStatus
from config import BOT_TOKEN
import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import PlainTextResponse, Response
from starlette.routing import Route
from telegram import __version__ as TG_VER
try:
from telegram import __version_info__
except ImportError:
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
if __version_info__ < (20, 0, 0, "alpha", 1):
raise RuntimeError(
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
f"{TG_VER} version of this example, "
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
)
from telegram import Update
from telegram.constants import ParseMode
from telegram.ext import (
Application,
CallbackContext,
CommandHandler,
ContextTypes,
ExtBot,
TypeHandler,
)
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
@dataclass
class WebhookUpdate:
"""Simple dataclass to wrap a custom update type"""
user_id: int
payload: str
class CustomContext(CallbackContext[ExtBot, dict, dict, dict]):
"""
Custom CallbackContext class that makes `user_data` available for updates of type
`WebhookUpdate`.
"""
@classmethod
def from_update(
cls,
update: object,
application: "Application",
) -> "CustomContext":
if isinstance(update, WebhookUpdate):
return cls(application=application, user_id=update.user_id)
return super().from_update(update, application)
async def start(update: Update, context: CustomContext) -> None:
"""Display a message with instructions on how to use this bot."""
url = context.bot_data["url"]
payload_url = html.escape(f"{url}/submitpayload?user_id=<your user id>&payload=<payload>")
text = (
f"To check if the bot is still running, call <code>{url}/healthcheck</code>.\n\n"
f"To post a custom update, call <code>{payload_url}</code>."
)
await update.message.reply_html(text=text)
async def webhook_update(update: WebhookUpdate, context: CustomContext) -> None:
"""Callback that handles the custom updates."""
chat_member = await context.bot.get_chat_member(chat_id=update.user_id, user_id=update.user_id)
payloads = context.user_data.setdefault("payloads", [])
payloads.append(update.payload)
combined_payloads = "</code>\n• <code>".join(payloads)
text = (
f"The user {chat_member.user.mention_html()} has sent a new payload. "
f"So far they have sent the following payloads: \n\n• <code>{combined_payloads}</code>"
)
await context.bot.send_message(
chat_id=context.bot_data["admin_chat_id"], text=text, parse_mode=ParseMode.HTML
)
async def main() -> None:
"""Set up the application and a custom webserver."""
url = "https://jinwei.me"
admin_chat_id = 123456
port = 8000
context_types = ContextTypes(context=CustomContext)
# Here we set updater to None because we want our custom webhook server to handle the updates
# and hence we don't need an Updater instance
application = (
Application.builder().token(BOT_TOKEN).updater(None).context_types(context_types).build()
)
# save the values in `bot_data` such that we may easily access them in the callbacks
application.bot_data["url"] = url
application.bot_data["admin_chat_id"] = admin_chat_id
# register handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(TypeHandler(type=WebhookUpdate, callback=webhook_update))
# Pass webhook settings to telegram
await application.bot.set_webhook(url=f"{url}/telegram")
# Set up webserver
async def telegram(request: Request) -> Response:
"""Handle incoming Telegram updates by putting them into the `update_queue`"""
await application.update_queue.put(
Update.de_json(data=await request.json(), bot=application.bot)
)
return Response()
async def custom_updates(request: Request) -> PlainTextResponse:
"""
Handle incoming webhook updates by also putting them into the `update_queue` if
the required parameters were passed correctly.
"""
try:
user_id = int(request.query_params["user_id"])
payload = request.query_params["payload"]
except KeyError:
return PlainTextResponse(
status_code=HTTPStatus.BAD_REQUEST,
content="Please pass both `user_id` and `payload` as query parameters.",
)
except ValueError:
return PlainTextResponse(
status_code=HTTPStatus.BAD_REQUEST,
content="The `user_id` must be a string!",
)
await application.update_queue.put(WebhookUpdate(user_id=user_id, payload=payload))
return PlainTextResponse("Thank you for the submission! It's being forwarded.")
async def health(_: Request) -> PlainTextResponse:
"""For the health endpoint, reply with a simple plain text message."""
return PlainTextResponse(content="The bot is still running fine :)")
starlette_app = Starlette(
routes=[
Route("/telegram", telegram, methods=["POST"]),
Route("/healthcheck", health, methods=["GET"]),
Route("/submitpayload", custom_updates, methods=["POST", "GET"]),
]
)
webserver = uvicorn.Server(
config=uvicorn.Config(
app=starlette_app,
port=port,
use_colors=False,
host="127.0.0.1",
)
)
# Run application and webserver together
async with application:
await application.start()
await webserver.serve()
await application.stop()
if __name__ == "__main__":
asyncio.run(main())