diff options
author | clarkzjw <[email protected]> | 2023-02-21 17:39:59 -0800 |
---|---|---|
committer | clarkzjw <[email protected]> | 2023-02-21 17:39:59 -0800 |
commit | 5b09e9d190b642a4e5571c0e4504c155f56a6f5a (patch) | |
tree | e4a1ea917b1f5e56d34394ac4d62bb0e71571bf3 /bot.py | |
parent | 431e8abcc4bf858e4ca945384a2f60702f1b5a4c (diff) | |
download | swarm2fediverse-5b09e9d190b642a4e5571c0e4504c155f56a6f5a.tar.gz |
bot: enable location search
Diffstat (limited to 'bot.py')
-rw-r--r-- | bot.py | 335 |
1 files changed, 335 insertions, 0 deletions
@@ -0,0 +1,335 @@ | |||
1 | #!/usr/bin/env python | ||
2 | # pylint: disable=unused-argument, wrong-import-position | ||
3 | # This program is dedicated to the public domain under the CC0 license. | ||
4 | |||
5 | import logging | ||
6 | import io | ||
7 | import telegram.constants | ||
8 | from telegram import __version__ as TG_VER | ||
9 | |||
10 | try: | ||
11 | from telegram import __version_info__ | ||
12 | except ImportError: | ||
13 | __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment] | ||
14 | |||
15 | if __version_info__ < (20, 0, 0, "alpha", 1): | ||
16 | raise RuntimeError( | ||
17 | f"This example is not compatible with your current PTB version {TG_VER}. To view the " | ||
18 | f"{TG_VER} version of this example, " | ||
19 | f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" | ||
20 | ) | ||
21 | |||
22 | from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton | ||
23 | from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext | ||
24 | from config import BOT_TOKEN | ||
25 | from foursquare.poi import query_poi | ||
26 | from dbstore.dbm_store import get_loc | ||
27 | from toot import mastodon_client | ||
28 | from typing import TypedDict, List, cast | ||
29 | |||
30 | scheduler = None | ||
31 | PRIVACY, TOOT = map(chr, range(8, 10)) | ||
32 | |||
33 | WAIT_LOC, LOCATION, LOCATION_SEARCH, PHOTO, PROCESS_PHOTO, FINAL, SETTING = range(7) | ||
34 | |||
35 | # Enable logging | ||
36 | logging.basicConfig( | ||
37 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO | ||
38 | ) | ||
39 | logger = logging.getLogger(__name__) | ||
40 | |||
41 | MAIN_MENU = ReplyKeyboardMarkup([ | ||
42 | [telegram.KeyboardButton(text="/check in", request_location=True)], | ||
43 | [telegram.KeyboardButton(text="/cancel")], | ||
44 | [telegram.KeyboardButton(text="/setting")] | ||
45 | ]) | ||
46 | |||
47 | SKIP_MENU = ReplyKeyboardMarkup([[telegram.KeyboardButton(text="/skip")]]) | ||
48 | SETTING_MENU = ReplyKeyboardMarkup( | ||
49 | [ | ||
50 | [KeyboardButton(text="/tos")], | ||
51 | [telegram.KeyboardButton(text="/back")], | ||
52 | ] | ||
53 | ) | ||
54 | |||
55 | |||
56 | async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
57 | hello = "Hello, this is `checkin.bot`. \n\n" \ | ||
58 | "This is a Telegram bot with functionality similar to Foursquare Swarm, " \ | ||
59 | "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \ | ||
60 | "Aware of privacy concerns, this bot will not store your location data." \ | ||
61 | "*Be safe and cautious when sharing your real time location on the web.* \n\n" \ | ||
62 | "Start using this bot by sharing your location using Telegram context menu to it." | ||
63 | |||
64 | await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) | ||
65 | await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU) | ||
66 | |||
67 | return LOCATION | ||
68 | |||
69 | |||
70 | async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
71 | context.user_data["latitude"] = update.message.location.latitude | ||
72 | context.user_data["longitude"] = update.message.location.longitude | ||
73 | await update.message.reply_text("You can input location search keywords or press skip", reply_markup=SKIP_MENU) | ||
74 | |||
75 | return LOCATION_SEARCH | ||
76 | |||
77 | |||
78 | async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
79 | query = update.callback_query | ||
80 | await query.answer() | ||
81 | print(query.data) | ||
82 | context.user_data["fsq_id"] = query.data | ||
83 | await query.delete_message() | ||
84 | |||
85 | poi = get_loc(context.user_data["fsq_id"]) | ||
86 | media_id = [] | ||
87 | content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" | ||
88 | status = mastodon_client.status_post( | ||
89 | content, | ||
90 | visibility="private", | ||
91 | media_ids=media_id) | ||
92 | |||
93 | context.user_data["status_id"] = status["id"] | ||
94 | context.user_data["status_content"] = content | ||
95 | |||
96 | print("status_id", context.user_data["status_id"]) | ||
97 | |||
98 | await query.message.reply_text( | ||
99 | text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", | ||
100 | parse_mode=telegram.constants.ParseMode.MARKDOWN, | ||
101 | reply_markup=MAIN_MENU | ||
102 | ) | ||
103 | |||
104 | await query.message.reply_text("You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU) | ||
105 | return PHOTO | ||
106 | |||
107 | |||
108 | async def location_search_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
109 | location_search = update.effective_message.text | ||
110 | latitude = context.user_data["latitude"] | ||
111 | longitude = context.user_data["longitude"] | ||
112 | |||
113 | keyboard = [] | ||
114 | |||
115 | for poi in query_poi(location_search, latitude, longitude): | ||
116 | keyboard.append([ | ||
117 | InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), | ||
118 | ]) | ||
119 | |||
120 | reply_markup = InlineKeyboardMarkup(keyboard) | ||
121 | context.user_data["location_search"] = location_search | ||
122 | await update.message.reply_text("Where are you? ", reply_markup=reply_markup) | ||
123 | |||
124 | return WAIT_LOC | ||
125 | |||
126 | |||
127 | async def skip_location_search(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
128 | return WAIT_LOC | ||
129 | |||
130 | |||
131 | async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
132 | await update.message.reply_text("TOS", reply_markup=MAIN_MENU) | ||
133 | |||
134 | |||
135 | async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
136 | await update.message.reply_text("Setting", reply_markup=SETTING_MENU) | ||
137 | return SETTING | ||
138 | |||
139 | |||
140 | async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
141 | await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU) | ||
142 | return ConversationHandler.END | ||
143 | |||
144 | |||
145 | async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
146 | await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) | ||
147 | |||
148 | fsq_id = context.user_data["fsq_id"] | ||
149 | poi = get_loc(context.user_data["fsq_id"]) | ||
150 | media_id = [] | ||
151 | |||
152 | if context.user_data.get("photo") is not None: | ||
153 | media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg") | ||
154 | media_id = [media["id"]] | ||
155 | # else: | ||
156 | # photo_url = get_poi_top_photo(context.user_data["fsq_id"]) | ||
157 | # if photo_url is not None: | ||
158 | # with urllib.request.urlopen(photo_url) as response: | ||
159 | # data = response.read() | ||
160 | # media = mastodon_client.media_post(data, mime_type="image/jpeg") | ||
161 | # media_id = [media["id"]] | ||
162 | |||
163 | mastodon_client.status_post( | ||
164 | f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", | ||
165 | visibility="private", | ||
166 | media_ids=media_id) | ||
167 | |||
168 | await update.message.delete_message() | ||
169 | return ConversationHandler.END | ||
170 | |||
171 | |||
172 | async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
173 | """Displays info on how to use the bot.""" | ||
174 | await update.message.reply_text("Use /start to test this bot.") | ||
175 | |||
176 | |||
177 | async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
178 | """Cancels and ends the conversation.""" | ||
179 | user = update.message.from_user | ||
180 | logger.info("User %s canceled the conversation.", user.first_name) | ||
181 | await update.message.reply_text( | ||
182 | text="Setting canceled.", | ||
183 | # "Bye! I hope we can talk again some day.", | ||
184 | reply_markup=MAIN_MENU | ||
185 | ) | ||
186 | |||
187 | return ConversationHandler.END | ||
188 | |||
189 | |||
190 | async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
191 | """Cancels and ends the conversation.""" | ||
192 | user = update.message.from_user | ||
193 | logger.info("User %s canceled the conversation.", user.first_name) | ||
194 | await update.message.reply_text( | ||
195 | text="Canceled.", | ||
196 | # "Bye! I hope we can talk again some day.", | ||
197 | reply_markup=MAIN_MENU | ||
198 | ) | ||
199 | |||
200 | return ConversationHandler.END | ||
201 | |||
202 | |||
203 | class MsgDict(TypedDict): | ||
204 | media_id: str | ||
205 | caption: str | ||
206 | status_id: int | ||
207 | content: str | ||
208 | chat_id: int | ||
209 | |||
210 | |||
211 | async def media_group_sender(context: CallbackContext): | ||
212 | context.job.data = cast(List[MsgDict], context.job.data) | ||
213 | |||
214 | media_id = [] | ||
215 | chat_id = context.job.data[0].get("chat_id") | ||
216 | for msg_dict in context.job.data: | ||
217 | if len(media_id) >= 4: | ||
218 | print("Cannot attach more than 4 photos") | ||
219 | break | ||
220 | file = await context.bot.get_file(msg_dict.get("media_id")) | ||
221 | img = io.BytesIO() | ||
222 | await file.download_to_memory(img) | ||
223 | |||
224 | img.seek(0) | ||
225 | |||
226 | media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") | ||
227 | media_id.append(media["id"]) | ||
228 | |||
229 | mastodon_client.status_update( | ||
230 | status=msg_dict.get("content"), | ||
231 | id=msg_dict.get("status_id"), | ||
232 | media_ids=media_id) | ||
233 | |||
234 | await context.bot.send_message(chat_id=chat_id, text="Done", | ||
235 | reply_markup=MAIN_MENU | ||
236 | ) | ||
237 | |||
238 | |||
239 | async def photo(update: Update, context: CallbackContext): | ||
240 | """Stores the photo and asks for a location.""" | ||
241 | global scheduler | ||
242 | await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) | ||
243 | |||
244 | status_id = context.user_data["status_id"] | ||
245 | status_content = context.user_data["status_content"] | ||
246 | |||
247 | message = update.effective_message | ||
248 | context.user_data["media"] = [] | ||
249 | if message.media_group_id: | ||
250 | media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id | ||
251 | msg_dict = { | ||
252 | "media_id": media_id, | ||
253 | "caption": message.caption_html, | ||
254 | "status_id": status_id, | ||
255 | "content": status_content, | ||
256 | "chat_id": message.chat_id, | ||
257 | } | ||
258 | jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) | ||
259 | if jobs: | ||
260 | jobs[0].data.append(msg_dict) | ||
261 | else: | ||
262 | context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict], | ||
263 | name=str(message.media_group_id)) | ||
264 | else: | ||
265 | file = await update.message.effective_attachment[-1].get_file() | ||
266 | img = io.BytesIO() | ||
267 | await file.download_to_memory(img) | ||
268 | img.seek(0) | ||
269 | |||
270 | media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") | ||
271 | mastodon_client.status_update( | ||
272 | status=status_content, | ||
273 | id=status_id, | ||
274 | media_ids=media["id"]) | ||
275 | |||
276 | await update.message.reply_text(text="Done", | ||
277 | reply_markup=MAIN_MENU | ||
278 | ) | ||
279 | |||
280 | |||
281 | async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
282 | print(context.user_data) | ||
283 | await update.message.reply_text( | ||
284 | text="Done.", reply_markup=MAIN_MENU | ||
285 | ) | ||
286 | return ConversationHandler.END | ||
287 | |||
288 | |||
289 | def main() -> None: | ||
290 | application = Application.builder().token(BOT_TOKEN).build() | ||
291 | |||
292 | checkin_handler = ConversationHandler( | ||
293 | entry_points=[ | ||
294 | CommandHandler("start", start), | ||
295 | MessageHandler(filters.LOCATION, checkin), | ||
296 | ], | ||
297 | states={ | ||
298 | LOCATION: [ | ||
299 | MessageHandler(filters.LOCATION, checkin), | ||
300 | ], | ||
301 | WAIT_LOC: [CallbackQueryHandler(process_callback)], | ||
302 | LOCATION_SEARCH: [ | ||
303 | MessageHandler(filters.TEXT, location_search_callback), | ||
304 | CommandHandler("skip", skip_location_search), | ||
305 | ], | ||
306 | PHOTO: [MessageHandler(filters.PHOTO, photo), | ||
307 | CommandHandler("skip", skip_photo)], | ||
308 | }, | ||
309 | fallbacks=[CommandHandler("cancel", cancel)], | ||
310 | per_message=False, | ||
311 | allow_reentry=True, | ||
312 | ) | ||
313 | |||
314 | setting_conv_handler = ConversationHandler( | ||
315 | entry_points=[CommandHandler("setting", setting)], | ||
316 | states={ | ||
317 | SETTING: [ | ||
318 | CallbackQueryHandler(setting_process_callback), | ||
319 | ], | ||
320 | }, | ||
321 | fallbacks=[CommandHandler("back", setting_cancel)], | ||
322 | per_message=False, | ||
323 | allow_reentry=True, | ||
324 | ) | ||
325 | |||
326 | application.add_handler(CommandHandler("tos", tos)) | ||
327 | application.add_handler(setting_conv_handler, 2) | ||
328 | application.add_handler(checkin_handler, 1) | ||
329 | |||
330 | # Run the bot until the user presses Ctrl-C | ||
331 | application.run_polling() | ||
332 | |||
333 | |||
334 | if __name__ == "__main__": | ||
335 | main() | ||