aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'customwebhookexample.py')
-rw-r--r--customwebhookexample.py193
1 files changed, 193 insertions, 0 deletions
diff --git a/customwebhookexample.py b/customwebhookexample.py
new file mode 100644
index 0000000..328a327
--- /dev/null
+++ b/customwebhookexample.py
@@ -0,0 +1,193 @@
1#!/usr/bin/env python
2# This program is dedicated to the public domain under the CC0 license.
3# pylint: disable=import-error,wrong-import-position
4"""
5Simple example of a bot that uses a custom webhook setup and handles custom updates.
6For the custom webhook setup, the libraries `starlette` and `uvicorn` are used. Please install
7them as `pip install starlette~=0.20.0 uvicorn~=0.17.0`.
8Note that any other `asyncio` based web server framework can be used for a custom webhook setup
9just as well.
10
11Usage:
12Set bot token, url, admin chat_id and port at the start of the `main` function.
13You may also need to change the `listen` value in the uvicorn configuration to match your setup.
14Press Ctrl-C on the command line or send a signal to the process to stop the bot.
15"""
16import asyncio
17import html
18import logging
19from dataclasses import dataclass
20from http import HTTPStatus
21from config import BOT_TOKEN
22
23import uvicorn
24from starlette.applications import Starlette
25from starlette.requests import Request
26from starlette.responses import PlainTextResponse, Response
27from starlette.routing import Route
28
29from telegram import __version__ as TG_VER
30
31try:
32 from telegram import __version_info__
33except ImportError:
34 __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
35
36if __version_info__ < (20, 0, 0, "alpha", 1):
37 raise RuntimeError(
38 f"This example is not compatible with your current PTB version {TG_VER}. To view the "
39 f"{TG_VER} version of this example, "
40 f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
41 )
42
43from telegram import Update
44from telegram.constants import ParseMode
45from telegram.ext import (
46 Application,
47 CallbackContext,
48 CommandHandler,
49 ContextTypes,
50 ExtBot,
51 TypeHandler,
52)
53
54# Enable logging
55logging.basicConfig(
56 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
57)
58logger = logging.getLogger(__name__)
59
60
61@dataclass
62class WebhookUpdate:
63 """Simple dataclass to wrap a custom update type"""
64
65 user_id: int
66 payload: str
67
68
69class CustomContext(CallbackContext[ExtBot, dict, dict, dict]):
70 """
71 Custom CallbackContext class that makes `user_data` available for updates of type
72 `WebhookUpdate`.
73 """
74
75 @classmethod
76 def from_update(
77 cls,
78 update: object,
79 application: "Application",
80 ) -> "CustomContext":
81 if isinstance(update, WebhookUpdate):
82 return cls(application=application, user_id=update.user_id)
83 return super().from_update(update, application)
84
85
86async def start(update: Update, context: CustomContext) -> None:
87 """Display a message with instructions on how to use this bot."""
88 url = context.bot_data["url"]
89 payload_url = html.escape(f"{url}/submitpayload?user_id=<your user id>&payload=<payload>")
90 text = (
91 f"To check if the bot is still running, call <code>{url}/healthcheck</code>.\n\n"
92 f"To post a custom update, call <code>{payload_url}</code>."
93 )
94 await update.message.reply_html(text=text)
95
96
97async def webhook_update(update: WebhookUpdate, context: CustomContext) -> None:
98 """Callback that handles the custom updates."""
99 chat_member = await context.bot.get_chat_member(chat_id=update.user_id, user_id=update.user_id)
100 payloads = context.user_data.setdefault("payloads", [])
101 payloads.append(update.payload)
102 combined_payloads = "</code>\n• <code>".join(payloads)
103 text = (
104 f"The user {chat_member.user.mention_html()} has sent a new payload. "
105 f"So far they have sent the following payloads: \n\n• <code>{combined_payloads}</code>"
106 )
107 await context.bot.send_message(
108 chat_id=context.bot_data["admin_chat_id"], text=text, parse_mode=ParseMode.HTML
109 )
110
111
112async def main() -> None:
113 """Set up the application and a custom webserver."""
114 url = "https://jinwei.me"
115 admin_chat_id = 123456
116 port = 8000
117
118 context_types = ContextTypes(context=CustomContext)
119 # Here we set updater to None because we want our custom webhook server to handle the updates
120 # and hence we don't need an Updater instance
121 application = (
122 Application.builder().token(BOT_TOKEN).updater(None).context_types(context_types).build()
123 )
124 # save the values in `bot_data` such that we may easily access them in the callbacks
125 application.bot_data["url"] = url
126 application.bot_data["admin_chat_id"] = admin_chat_id
127
128 # register handlers
129 application.add_handler(CommandHandler("start", start))
130 application.add_handler(TypeHandler(type=WebhookUpdate, callback=webhook_update))
131
132 # Pass webhook settings to telegram
133 await application.bot.set_webhook(url=f"{url}/telegram")
134
135 # Set up webserver
136 async def telegram(request: Request) -> Response:
137 """Handle incoming Telegram updates by putting them into the `update_queue`"""
138 await application.update_queue.put(
139 Update.de_json(data=await request.json(), bot=application.bot)
140 )
141 return Response()
142
143 async def custom_updates(request: Request) -> PlainTextResponse:
144 """
145 Handle incoming webhook updates by also putting them into the `update_queue` if
146 the required parameters were passed correctly.
147 """
148 try:
149 user_id = int(request.query_params["user_id"])
150 payload = request.query_params["payload"]
151 except KeyError:
152 return PlainTextResponse(
153 status_code=HTTPStatus.BAD_REQUEST,
154 content="Please pass both `user_id` and `payload` as query parameters.",
155 )
156 except ValueError:
157 return PlainTextResponse(
158 status_code=HTTPStatus.BAD_REQUEST,
159 content="The `user_id` must be a string!",
160 )
161
162 await application.update_queue.put(WebhookUpdate(user_id=user_id, payload=payload))
163 return PlainTextResponse("Thank you for the submission! It's being forwarded.")
164
165 async def health(_: Request) -> PlainTextResponse:
166 """For the health endpoint, reply with a simple plain text message."""
167 return PlainTextResponse(content="The bot is still running fine :)")
168
169 starlette_app = Starlette(
170 routes=[
171 Route("/telegram", telegram, methods=["POST"]),
172 Route("/healthcheck", health, methods=["GET"]),
173 Route("/submitpayload", custom_updates, methods=["POST", "GET"]),
174 ]
175 )
176 webserver = uvicorn.Server(
177 config=uvicorn.Config(
178 app=starlette_app,
179 port=port,
180 use_colors=False,
181 host="127.0.0.1",
182 )
183 )
184
185 # Run application and webserver together
186 async with application:
187 await application.start()
188 await webserver.serve()
189 await application.stop()
190
191
192if __name__ == "__main__":
193 asyncio.run(main())
Powered by cgit v1.2.3 (git 2.41.0)