diff options
Diffstat (limited to 'bot.py')
-rw-r--r-- | bot.py | 314 |
1 files changed, 314 insertions, 0 deletions
@@ -0,0 +1,314 @@ | |||
1 | #!/usr/bin/env python | ||
2 | # pylint: disable=unused-argument, wrong-import-position | ||
3 | # This program is dedicated to the public domain under the CC0 license. | ||
4 | |||
5 | import logging | ||
6 | import io | ||
7 | import telegram.constants | ||
8 | from telegram import __version__ as TG_VER | ||
9 | |||
10 | try: | ||
11 | from telegram import __version_info__ | ||
12 | except ImportError: | ||
13 | __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment] | ||
14 | |||
15 | if __version_info__ < (20, 0, 0, "alpha", 1): | ||
16 | raise RuntimeError( | ||
17 | f"This example is not compatible with your current PTB version {TG_VER}. To view the " | ||
18 | f"{TG_VER} version of this example, " | ||
19 | f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" | ||
20 | ) | ||
21 | |||
22 | from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton | ||
23 | from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext | ||
24 | from ..config import BOT_TOKEN | ||
25 | from foursquare.poi import query_poi | ||
26 | from dbstore.dbm_store import get_loc | ||
27 | from toot import mastodon_client | ||
28 | from typing import TypedDict, List, cast | ||
29 | |||
30 | scheduler = None | ||
31 | PRIVACY, TOOT = map(chr, range(8, 10)) | ||
32 | |||
33 | WAIT_LOC, LOCATION, PHOTO, PROCESS_PHOTO, FINAL, SETTING = range(6) | ||
34 | |||
35 | # Enable logging | ||
36 | logging.basicConfig( | ||
37 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO | ||
38 | ) | ||
39 | logger = logging.getLogger(__name__) | ||
40 | |||
41 | MAIN_MENU = ReplyKeyboardMarkup([ | ||
42 | [telegram.KeyboardButton(text="/check in", request_location=True)], | ||
43 | [telegram.KeyboardButton(text="/cancel")], | ||
44 | [telegram.KeyboardButton(text="/setting")] | ||
45 | ]) | ||
46 | |||
47 | SKIP_MENU = ReplyKeyboardMarkup([[telegram.KeyboardButton(text="/skip")]]) | ||
48 | SETTING_MENU = ReplyKeyboardMarkup( | ||
49 | [ | ||
50 | [KeyboardButton(text="/tos")], | ||
51 | [telegram.KeyboardButton(text="/back")], | ||
52 | ] | ||
53 | ) | ||
54 | |||
55 | |||
56 | async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
57 | hello = "Hello, this is `checkin.bot`. \n\n" \ | ||
58 | "This is a Telegram bot with functionality similar to Foursquare Swarm, " \ | ||
59 | "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \ | ||
60 | "Aware of privacy concerns, this bot will not store your location data." \ | ||
61 | "*Be safe and cautious when sharing your real time location on the web.* \n\n" \ | ||
62 | "Start using this bot by sharing your location using Telegram context menu to it." | ||
63 | |||
64 | await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) | ||
65 | await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU) | ||
66 | |||
67 | return LOCATION | ||
68 | |||
69 | |||
70 | async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
71 | keyboard = [] | ||
72 | |||
73 | for poi in query_poi(update.message.location.latitude, update.message.location.longitude): | ||
74 | keyboard.append([ | ||
75 | InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), | ||
76 | ]) | ||
77 | |||
78 | reply_markup = InlineKeyboardMarkup(keyboard) | ||
79 | await update.message.reply_text("Where are you?", reply_markup=reply_markup) | ||
80 | |||
81 | return WAIT_LOC | ||
82 | |||
83 | |||
84 | async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
85 | query = update.callback_query | ||
86 | await query.answer() | ||
87 | print(query.data) | ||
88 | context.user_data["fsq_id"] = query.data | ||
89 | await query.delete_message() | ||
90 | |||
91 | poi = get_loc(context.user_data["fsq_id"]) | ||
92 | media_id = [] | ||
93 | content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" | ||
94 | status = mastodon_client.status_post( | ||
95 | content, | ||
96 | visibility="private", | ||
97 | media_ids=media_id) | ||
98 | |||
99 | context.user_data["status_id"] = status["id"] | ||
100 | context.user_data["status_content"] = content | ||
101 | |||
102 | print("status_id", context.user_data["status_id"]) | ||
103 | |||
104 | await query.message.reply_text( | ||
105 | text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", | ||
106 | parse_mode=telegram.constants.ParseMode.MARKDOWN, | ||
107 | reply_markup=MAIN_MENU | ||
108 | ) | ||
109 | |||
110 | await query.message.reply_text("You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU) | ||
111 | return PHOTO | ||
112 | |||
113 | |||
114 | async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
115 | await update.message.reply_text("TOS", reply_markup=MAIN_MENU) | ||
116 | |||
117 | |||
118 | async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
119 | await update.message.reply_text("Setting", reply_markup=SETTING_MENU) | ||
120 | return SETTING | ||
121 | |||
122 | |||
123 | async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
124 | await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU) | ||
125 | return ConversationHandler.END | ||
126 | |||
127 | |||
128 | async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
129 | await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) | ||
130 | |||
131 | fsq_id = context.user_data["fsq_id"] | ||
132 | poi = get_loc(context.user_data["fsq_id"]) | ||
133 | media_id = [] | ||
134 | |||
135 | if context.user_data.get("photo") is not None: | ||
136 | media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg") | ||
137 | media_id = [media["id"]] | ||
138 | # else: | ||
139 | # photo_url = get_poi_top_photo(context.user_data["fsq_id"]) | ||
140 | # if photo_url is not None: | ||
141 | # with urllib.request.urlopen(photo_url) as response: | ||
142 | # data = response.read() | ||
143 | # media = mastodon_client.media_post(data, mime_type="image/jpeg") | ||
144 | # media_id = [media["id"]] | ||
145 | |||
146 | mastodon_client.status_post( | ||
147 | f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", | ||
148 | visibility="private", | ||
149 | media_ids=media_id) | ||
150 | |||
151 | await update.message.delete_message() | ||
152 | return ConversationHandler.END | ||
153 | |||
154 | |||
155 | async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | ||
156 | """Displays info on how to use the bot.""" | ||
157 | await update.message.reply_text("Use /start to test this bot.") | ||
158 | |||
159 | |||
160 | async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
161 | """Cancels and ends the conversation.""" | ||
162 | user = update.message.from_user | ||
163 | logger.info("User %s canceled the conversation.", user.first_name) | ||
164 | await update.message.reply_text( | ||
165 | text="Setting canceled.", | ||
166 | # "Bye! I hope we can talk again some day.", | ||
167 | reply_markup=MAIN_MENU | ||
168 | ) | ||
169 | |||
170 | return ConversationHandler.END | ||
171 | |||
172 | |||
173 | async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: | ||
174 | """Cancels and ends the conversation.""" | ||
175 | user = update.message.from_user | ||
176 | logger.info("User %s canceled the conversation.", user.first_name) | ||
177 | await update.message.reply_text( | ||
178 | text="Canceled.", | ||
179 | # "Bye! I hope we can talk again some day.", | ||
180 | reply_markup=MAIN_MENU | ||
181 | ) | ||
182 | |||
183 | return ConversationHandler.END | ||
184 | |||
185 | |||
186 | class MsgDict(TypedDict): | ||
187 | media_id: str | ||
188 | caption: str | ||
189 | status_id: int | ||
190 | content: str | ||
191 | chat_id: int | ||
192 | |||
193 | |||
194 | async def media_group_sender(context: CallbackContext): | ||
195 | context.job.data = cast(List[MsgDict], context.job.data) | ||
196 | |||
197 | media_id = [] | ||
198 | chat_id = context.job.data[0].get("chat_id") | ||
199 | for msg_dict in context.job.data: | ||
200 | if len(media_id) >= 4: | ||
201 | print("Cannot attach more than 4 photos") | ||
202 | break | ||
203 | file = await context.bot.get_file(msg_dict.get("media_id")) | ||
204 | img = io.BytesIO() | ||
205 | await file.download_to_memory(img) | ||
206 | |||
207 | img.seek(0) | ||
208 | |||
209 | media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") | ||
210 | media_id.append(media["id"]) | ||
211 | |||
212 | mastodon_client.status_update( | ||
213 | status=msg_dict.get("content"), | ||
214 | id=msg_dict.get("status_id"), | ||
215 | media_ids=media_id) | ||
216 | |||
217 | await context.bot.send_message(chat_id=chat_id, text="Done", | ||
218 | reply_markup=MAIN_MENU | ||
219 | ) | ||
220 | |||
221 | |||
222 | async def photo(update: Update, context: CallbackContext): | ||
223 | """Stores the photo and asks for a location.""" | ||
224 | global scheduler | ||
225 | await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) | ||
226 | |||
227 | status_id = context.user_data["status_id"] | ||
228 | status_content = context.user_data["status_content"] | ||
229 | |||
230 | message = update.effective_message | ||
231 | context.user_data["media"] = [] | ||
232 | if message.media_group_id: | ||
233 | media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id | ||
234 | msg_dict = { | ||
235 | "media_id": media_id, | ||
236 | "caption": message.caption_html, | ||
237 | "status_id": status_id, | ||
238 | "content": status_content, | ||
239 | "chat_id": message.chat_id, | ||
240 | } | ||
241 | jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) | ||
242 | if jobs: | ||
243 | jobs[0].data.append(msg_dict) | ||
244 | else: | ||
245 | context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict], | ||
246 | name=str(message.media_group_id)) | ||
247 | else: | ||
248 | file = await update.message.effective_attachment[-1].get_file() | ||
249 | img = io.BytesIO() | ||
250 | await file.download_to_memory(img) | ||
251 | img.seek(0) | ||
252 | |||
253 | media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") | ||
254 | mastodon_client.status_update( | ||
255 | status=status_content, | ||
256 | id=status_id, | ||
257 | media_ids=media["id"]) | ||
258 | |||
259 | await update.message.reply_text(text="Done", | ||
260 | reply_markup=MAIN_MENU | ||
261 | ) | ||
262 | |||
263 | |||
264 | async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): | ||
265 | print(context.user_data) | ||
266 | await update.message.reply_text( | ||
267 | text="Done.", reply_markup=MAIN_MENU | ||
268 | ) | ||
269 | return ConversationHandler.END | ||
270 | |||
271 | |||
272 | def main() -> None: | ||
273 | application = Application.builder().token(BOT_TOKEN).build() | ||
274 | |||
275 | checkin_handler = ConversationHandler( | ||
276 | entry_points=[ | ||
277 | CommandHandler("start", start), | ||
278 | MessageHandler(filters.LOCATION, checkin), | ||
279 | ], | ||
280 | states={ | ||
281 | LOCATION: [ | ||
282 | MessageHandler(filters.LOCATION, checkin), | ||
283 | ], | ||
284 | WAIT_LOC: [CallbackQueryHandler(process_callback)], | ||
285 | PHOTO: [MessageHandler(filters.PHOTO, photo), | ||
286 | CommandHandler("skip", skip_photo)], | ||
287 | }, | ||
288 | fallbacks=[CommandHandler("cancel", cancel)], | ||
289 | per_message=False, | ||
290 | allow_reentry=True, | ||
291 | ) | ||
292 | |||
293 | setting_conv_handler = ConversationHandler( | ||
294 | entry_points=[CommandHandler("setting", setting)], | ||
295 | states={ | ||
296 | SETTING: [ | ||
297 | CallbackQueryHandler(setting_process_callback), | ||
298 | ], | ||
299 | }, | ||
300 | fallbacks=[CommandHandler("back", setting_cancel)], | ||
301 | per_message=False, | ||
302 | allow_reentry=True, | ||
303 | ) | ||
304 | |||
305 | application.add_handler(CommandHandler("tos", tos)) | ||
306 | application.add_handler(setting_conv_handler, 2) | ||
307 | application.add_handler(checkin_handler, 1) | ||
308 | |||
309 | # Run the bot until the user presses Ctrl-C | ||
310 | application.run_polling() | ||
311 | |||
312 | |||
313 | if __name__ == "__main__": | ||
314 | main() | ||