diff options
author | clarkzjw <[email protected]> | 2023-02-21 13:00:07 -0800 |
---|---|---|
committer | clarkzjw <[email protected]> | 2023-02-21 13:00:07 -0800 |
commit | 431e8abcc4bf858e4ca945384a2f60702f1b5a4c (patch) | |
tree | a586f47e4a2d150a6961c6c914d0789fcf446d10 /bot | |
parent | 6602a954788e26be63c00369f6bf72822752c84b (diff) | |
download | swarm2fediverse-431e8abcc4bf858e4ca945384a2f60702f1b5a4c.tar.gz |
make bot as a module
Diffstat (limited to 'bot')
-rw-r--r-- | bot/__init__.py | 13 | ||||
-rw-r--r-- | bot/bot.py | 300 |
2 files changed, 313 insertions, 0 deletions
diff --git a/bot/__init__.py b/bot/__init__.py new file mode 100644 index 0000000..5e3c341 --- /dev/null +++ b/bot/__init__.py | |||
@@ -0,0 +1,13 @@ | |||
1 | from telegram import __version__ as TG_VER | ||
2 | |||
3 | try: | ||
4 | from telegram import __version_info__ | ||
5 | except ImportError: | ||
6 | __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment] | ||
7 | |||
8 | if __version_info__ < (20, 0, 0, "alpha", 1): | ||
9 | raise RuntimeError( | ||
10 | f"This example is not compatible with your current PTB version {TG_VER}. To view the " | ||
11 | f"{TG_VER} version of this example, " | ||
12 | f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" | ||
13 | ) | ||
diff --git a/bot/bot.py b/bot/bot.py new file mode 100644 index 0000000..2710184 --- /dev/null +++ b/bot/bot.py | |||
@@ -0,0 +1,300 @@ | |||
1 | #!/usr/bin/env python | ||
2 | # pylint: disable=unused-argument, wrong-import-position | ||
3 | # This program is dedicated to the public domain under the CC0 license. | ||
4 | |||
5 | import logging | ||
6 | import io | ||
7 | import telegram.constants | ||
8 | from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton | ||
9 | from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext | ||
10 | from ..config import BOT_TOKEN | ||
11 | from foursquare.poi import query_poi | ||
12 | from dbstore.dbm_store import get_loc | ||
13 | from toot import mastodon_client | ||
14 | from typing import TypedDict, List, cast | ||
15 | |||
16 | scheduler = None | ||
17 | PRIVACY, TOOT = map(chr, range(8, 10)) | ||
18 | |||
19 | WAIT_LOC, LOCATION, PHOTO, PROCESS_PHOTO, FINAL, SETTING = range(6) | ||
20 | |||
21 | # Enable logging | ||
22 | logging.basicConfig( | ||
23 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO | ||
24 | ) | ||
25 | logger = logging.getLogger(__name__) | ||
26 | |||
27 | MAIN_MENU = ReplyKeyboardMarkup([ | ||
28 | [telegram.KeyboardButton(text="/check in", request_location=True)], | ||
29 | [telegram.KeyboardButton(text="/cancel")], | ||
30 | [telegram.KeyboardButton(text="/setting")] | ||
31 | ]) | ||
32 | |||
33 | SKIP_MENU = ReplyKeyboardMarkup([[telegram.KeyboardButton(text="/skip")]]) | ||
34 | SETTING_MENU = ReplyKeyboardMarkup( | ||
35 | [ | ||
36 | [KeyboardButton(text="/tos")], | ||
37 | [telegram.KeyboardButton(text="/back")], | ||
38 | ] | ||
39 | ) | ||
40 | |||
41 | |||
42 | async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
43 | hello = "Hello, this is `checkin.bot`. \n\n" \ | ||
44 | "This is a Telegram bot with functionality similar to Foursquare Swarm, " \ | ||
45 | "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \ | ||
46 | "Aware of privacy concerns, this bot will not store your location data." \ | ||
47 | "*Be safe and cautious when sharing your real time location on the web.* \n\n" \ | ||
48 | "Start using this bot by sharing your location using Telegram context menu to it." | ||
49 | |||
50 | await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) | ||
51 | await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU) | ||
52 | |||
53 | return LOCATION | ||
54 | |||
55 | |||
56 | async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
57 | keyboard = [] | ||
58 | |||
59 | for poi in query_poi(update.message.location.latitude, update.message.location.longitude): | ||
60 | keyboard.append([ | ||
61 | InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), | ||
62 | ]) | ||
63 | |||
64 | reply_markup = InlineKeyboardMarkup(keyboard) | ||
65 | await update.message.reply_text("Where are you?", reply_markup=reply_markup) | ||
66 | |||
67 | return WAIT_LOC | ||
68 | |||
69 | |||
70 | async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
71 | query = update.callback_query | ||
72 | await query.answer() | ||
73 | print(query.data) | ||
74 | context.user_data["fsq_id"] = query.data | ||
75 | await query.delete_message() | ||
76 | |||
77 | poi = get_loc(context.user_data["fsq_id"]) | ||
78 | media_id = [] | ||
79 | content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" | ||
80 | status = mastodon_client.status_post( | ||
81 | content, | ||
82 | visibility="private", | ||
83 | media_ids=media_id) | ||
84 | |||
85 | context.user_data["status_id"] = status["id"] | ||
86 | context.user_data["status_content"] = content | ||
87 | |||
88 | print("status_id", context.user_data["status_id"]) | ||
89 | |||
90 | await query.message.reply_text( | ||
91 | text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", | ||
92 | parse_mode=telegram.constants.ParseMode.MARKDOWN, | ||
93 | reply_markup=MAIN_MENU | ||
94 | ) | ||
95 | |||
96 | await query.message.reply_text("You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU) | ||
97 | return PHOTO | ||
98 | |||
99 | |||
100 | async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
101 | await update.message.reply_text("TOS", reply_markup=MAIN_MENU) | ||
102 | |||
103 | |||
104 | async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
105 | await update.message.reply_text("Setting", reply_markup=SETTING_MENU) | ||
106 | return SETTING | ||
107 | |||
108 | |||
109 | async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
110 | await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU) | ||
111 | return ConversationHandler.END | ||
112 | |||
113 | |||
114 | async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
115 | await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) | ||
116 | |||
117 | fsq_id = context.user_data["fsq_id"] | ||
118 | poi = get_loc(context.user_data["fsq_id"]) | ||
119 | media_id = [] | ||
120 | |||
121 | if context.user_data.get("photo") is not None: | ||
122 | media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg") | ||
123 | media_id = [media["id"]] | ||
124 | # else: | ||
125 | # photo_url = get_poi_top_photo(context.user_data["fsq_id"]) | ||
126 | # if photo_url is not None: | ||
127 | # with urllib.request.urlopen(photo_url) as response: | ||
128 | # data = response.read() | ||
129 | # media = mastodon_client.media_post(data, mime_type="image/jpeg") | ||
130 | # media_id = [media["id"]] | ||
131 | |||
132 | mastodon_client.status_post( | ||
133 | f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", | ||
134 | visibility="private", | ||
135 | media_ids=media_id) | ||
136 | |||
137 | await update.message.delete_message() | ||
138 | return ConversationHandler.END | ||
139 | |||
140 | |||
141 | async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
142 | """Displays info on how to use the bot.""" | ||
143 | await update.message.reply_text("Use /start to test this bot.") | ||
144 | |||
145 | |||
146 | async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
147 | """Cancels and ends the conversation.""" | ||
148 | user = update.message.from_user | ||
149 | logger.info("User %s canceled the conversation.", user.first_name) | ||
150 | await update.message.reply_text( | ||
151 | text="Setting canceled.", | ||
152 | # "Bye! I hope we can talk again some day.", | ||
153 | reply_markup=MAIN_MENU | ||
154 | ) | ||
155 | |||
156 | return ConversationHandler.END | ||
157 | |||
158 | |||
159 | async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
160 | """Cancels and ends the conversation.""" | ||
161 | user = update.message.from_user | ||
162 | logger.info("User %s canceled the conversation.", user.first_name) | ||
163 | await update.message.reply_text( | ||
164 | text="Canceled.", | ||
165 | # "Bye! I hope we can talk again some day.", | ||
166 | reply_markup=MAIN_MENU | ||
167 | ) | ||
168 | |||
169 | return ConversationHandler.END | ||
170 | |||
171 | |||
172 | class MsgDict(TypedDict): | ||
173 | media_id: str | ||
174 | caption: str | ||
175 | status_id: int | ||
176 | content: str | ||
177 | chat_id: int | ||
178 | |||
179 | |||
180 | async def media_group_sender(context: CallbackContext): | ||
181 | context.job.data = cast(List[MsgDict], context.job.data) | ||
182 | |||
183 | media_id = [] | ||
184 | chat_id = context.job.data[0].get("chat_id") | ||
185 | for msg_dict in context.job.data: | ||
186 | if len(media_id) >= 4: | ||
187 | print("Cannot attach more than 4 photos") | ||
188 | break | ||
189 | file = await context.bot.get_file(msg_dict.get("media_id")) | ||
190 | img = io.BytesIO() | ||
191 | await file.download_to_memory(img) | ||
192 | |||
193 | img.seek(0) | ||
194 | |||
195 | media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") | ||
196 | media_id.append(media["id"]) | ||
197 | |||
198 | mastodon_client.status_update( | ||
199 | status=msg_dict.get("content"), | ||
200 | id=msg_dict.get("status_id"), | ||
201 | media_ids=media_id) | ||
202 | |||
203 | await context.bot.send_message(chat_id=chat_id, text="Done", | ||
204 | reply_markup=MAIN_MENU | ||
205 | ) | ||
206 | |||
207 | |||
208 | async def photo(update: Update, context: CallbackContext): | ||
209 | """Stores the photo and asks for a location.""" | ||
210 | global scheduler | ||
211 | await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) | ||
212 | |||
213 | status_id = context.user_data["status_id"] | ||
214 | status_content = context.user_data["status_content"] | ||
215 | |||
216 | message = update.effective_message | ||
217 | context.user_data["media"] = [] | ||
218 | if message.media_group_id: | ||
219 | media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id | ||
220 | msg_dict = { | ||
221 | "media_id": media_id, | ||
222 | "caption": message.caption_html, | ||
223 | "status_id": status_id, | ||
224 | "content": status_content, | ||
225 | "chat_id": message.chat_id, | ||
226 | } | ||
227 | jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) | ||
228 | if jobs: | ||
229 | jobs[0].data.append(msg_dict) | ||
230 | else: | ||
231 | context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict], | ||
232 | name=str(message.media_group_id)) | ||
233 | else: | ||
234 | file = await update.message.effective_attachment[-1].get_file() | ||
235 | img = io.BytesIO() | ||
236 | await file.download_to_memory(img) | ||
237 | img.seek(0) | ||
238 | |||
239 | media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") | ||
240 | mastodon_client.status_update( | ||
241 | status=status_content, | ||
242 | id=status_id, | ||
243 | media_ids=media["id"]) | ||
244 | |||
245 | await update.message.reply_text(text="Done", | ||
246 | reply_markup=MAIN_MENU | ||
247 | ) | ||
248 | |||
249 | |||
250 | async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
251 | print(context.user_data) | ||
252 | await update.message.reply_text( | ||
253 | text="Done.", reply_markup=MAIN_MENU | ||
254 | ) | ||
255 | return ConversationHandler.END | ||
256 | |||
257 | |||
258 | def main() -> None: | ||
259 | application = Application.builder().token(BOT_TOKEN).build() | ||
260 | |||
261 | checkin_handler = ConversationHandler( | ||
262 | entry_points=[ | ||
263 | CommandHandler("start", start), | ||
264 | MessageHandler(filters.LOCATION, checkin), | ||
265 | ], | ||
266 | states={ | ||
267 | LOCATION: [ | ||
268 | MessageHandler(filters.LOCATION, checkin), | ||
269 | ], | ||
270 | WAIT_LOC: [CallbackQueryHandler(process_callback)], | ||
271 | PHOTO: [MessageHandler(filters.PHOTO, photo), | ||
272 | CommandHandler("skip", skip_photo)], | ||
273 | }, | ||
274 | fallbacks=[CommandHandler("cancel", cancel)], | ||
275 | per_message=False, | ||
276 | allow_reentry=True, | ||
277 | ) | ||
278 | |||
279 | setting_conv_handler = ConversationHandler( | ||
280 | entry_points=[CommandHandler("setting", setting)], | ||
281 | states={ | ||
282 | SETTING: [ | ||
283 | CallbackQueryHandler(setting_process_callback), | ||
284 | ], | ||
285 | }, | ||
286 | fallbacks=[CommandHandler("back", setting_cancel)], | ||
287 | per_message=False, | ||
288 | allow_reentry=True, | ||
289 | ) | ||
290 | |||
291 | application.add_handler(CommandHandler("tos", tos)) | ||
292 | application.add_handler(setting_conv_handler, 2) | ||
293 | application.add_handler(checkin_handler, 1) | ||
294 | |||
295 | # Run the bot until the user presses Ctrl-C | ||
296 | application.run_polling() | ||
297 | |||
298 | |||
299 | if __name__ == "__main__": | ||
300 | main() | ||