aboutsummaryrefslogtreecommitdiff
path: root/bot.py
diff options
context:
space:
mode:
Diffstat (limited to 'bot.py')
-rw-r--r--bot.py335
1 files changed, 335 insertions, 0 deletions
diff --git a/bot.py b/bot.py
new file mode 100644
index 0000000..efde56f
--- /dev/null
+++ b/bot.py
@@ -0,0 +1,335 @@
1#!/usr/bin/env python
2# pylint: disable=unused-argument, wrong-import-position
3# This program is dedicated to the public domain under the CC0 license.
4
5import logging
6import io
7import telegram.constants
8from telegram import __version__ as TG_VER
9
10try:
11 from telegram import __version_info__
12except ImportError:
13 __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
14
15if __version_info__ < (20, 0, 0, "alpha", 1):
16 raise RuntimeError(
17 f"This example is not compatible with your current PTB version {TG_VER}. To view the "
18 f"{TG_VER} version of this example, "
19 f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
20 )
21
22from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
23from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext
24from config import BOT_TOKEN
25from foursquare.poi import query_poi
26from dbstore.dbm_store import get_loc
27from toot import mastodon_client
28from typing import TypedDict, List, cast
29
30scheduler = None
31PRIVACY, TOOT = map(chr, range(8, 10))
32
33WAIT_LOC, LOCATION, LOCATION_SEARCH, PHOTO, PROCESS_PHOTO, FINAL, SETTING = range(7)
34
35# Enable logging
36logging.basicConfig(
37 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
38)
39logger = logging.getLogger(__name__)
40
41MAIN_MENU = ReplyKeyboardMarkup([
42 [telegram.KeyboardButton(text="/check in", request_location=True)],
43 [telegram.KeyboardButton(text="/cancel")],
44 [telegram.KeyboardButton(text="/setting")]
45])
46
47SKIP_MENU = ReplyKeyboardMarkup([[telegram.KeyboardButton(text="/skip")]])
48SETTING_MENU = ReplyKeyboardMarkup(
49 [
50 [KeyboardButton(text="/tos")],
51 [telegram.KeyboardButton(text="/back")],
52 ]
53)
54
55
56async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
57 hello = "Hello, this is `checkin.bot`. \n\n" \
58 "This is a Telegram bot with functionality similar to Foursquare Swarm, " \
59 "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \
60 "Aware of privacy concerns, this bot will not store your location data." \
61 "*Be safe and cautious when sharing your real time location on the web.* \n\n" \
62 "Start using this bot by sharing your location using Telegram context menu to it."
63
64 await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN)
65 await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU)
66
67 return LOCATION
68
69
70async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
71 context.user_data["latitude"] = update.message.location.latitude
72 context.user_data["longitude"] = update.message.location.longitude
73 await update.message.reply_text("You can input location search keywords or press skip", reply_markup=SKIP_MENU)
74
75 return LOCATION_SEARCH
76
77
78async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
79 query = update.callback_query
80 await query.answer()
81 print(query.data)
82 context.user_data["fsq_id"] = query.data
83 await query.delete_message()
84
85 poi = get_loc(context.user_data["fsq_id"])
86 media_id = []
87 content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
88 status = mastodon_client.status_post(
89 content,
90 visibility="private",
91 media_ids=media_id)
92
93 context.user_data["status_id"] = status["id"]
94 context.user_data["status_content"] = content
95
96 print("status_id", context.user_data["status_id"])
97
98 await query.message.reply_text(
99 text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}",
100 parse_mode=telegram.constants.ParseMode.MARKDOWN,
101 reply_markup=MAIN_MENU
102 )
103
104 await query.message.reply_text("You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU)
105 return PHOTO
106
107
108async def location_search_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
109 location_search = update.effective_message.text
110 latitude = context.user_data["latitude"]
111 longitude = context.user_data["longitude"]
112
113 keyboard = []
114
115 for poi in query_poi(location_search, latitude, longitude):
116 keyboard.append([
117 InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
118 ])
119
120 reply_markup = InlineKeyboardMarkup(keyboard)
121 context.user_data["location_search"] = location_search
122 await update.message.reply_text("Where are you? ", reply_markup=reply_markup)
123
124 return WAIT_LOC
125
126
127async def skip_location_search(update: Update, context: ContextTypes.DEFAULT_TYPE):
128 return WAIT_LOC
129
130
131async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
132 await update.message.reply_text("TOS", reply_markup=MAIN_MENU)
133
134
135async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
136 await update.message.reply_text("Setting", reply_markup=SETTING_MENU)
137 return SETTING
138
139
140async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
141 await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU)
142 return ConversationHandler.END
143
144
145async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE):
146 await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)
147
148 fsq_id = context.user_data["fsq_id"]
149 poi = get_loc(context.user_data["fsq_id"])
150 media_id = []
151
152 if context.user_data.get("photo") is not None:
153 media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg")
154 media_id = [media["id"]]
155 # else:
156 # photo_url = get_poi_top_photo(context.user_data["fsq_id"])
157 # if photo_url is not None:
158 # with urllib.request.urlopen(photo_url) as response:
159 # data = response.read()
160 # media = mastodon_client.media_post(data, mime_type="image/jpeg")
161 # media_id = [media["id"]]
162
163 mastodon_client.status_post(
164 f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}",
165 visibility="private",
166 media_ids=media_id)
167
168 await update.message.delete_message()
169 return ConversationHandler.END
170
171
172async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
173 """Displays info on how to use the bot."""
174 await update.message.reply_text("Use /start to test this bot.")
175
176
177async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
178 """Cancels and ends the conversation."""
179 user = update.message.from_user
180 logger.info("User %s canceled the conversation.", user.first_name)
181 await update.message.reply_text(
182 text="Setting canceled.",
183 # "Bye! I hope we can talk again some day.",
184 reply_markup=MAIN_MENU
185 )
186
187 return ConversationHandler.END
188
189
190async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
191 """Cancels and ends the conversation."""
192 user = update.message.from_user
193 logger.info("User %s canceled the conversation.", user.first_name)
194 await update.message.reply_text(
195 text="Canceled.",
196 # "Bye! I hope we can talk again some day.",
197 reply_markup=MAIN_MENU
198 )
199
200 return ConversationHandler.END
201
202
203class MsgDict(TypedDict):
204 media_id: str
205 caption: str
206 status_id: int
207 content: str
208 chat_id: int
209
210
211async def media_group_sender(context: CallbackContext):
212 context.job.data = cast(List[MsgDict], context.job.data)
213
214 media_id = []
215 chat_id = context.job.data[0].get("chat_id")
216 for msg_dict in context.job.data:
217 if len(media_id) >= 4:
218 print("Cannot attach more than 4 photos")
219 break
220 file = await context.bot.get_file(msg_dict.get("media_id"))
221 img = io.BytesIO()
222 await file.download_to_memory(img)
223
224 img.seek(0)
225
226 media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
227 media_id.append(media["id"])
228
229 mastodon_client.status_update(
230 status=msg_dict.get("content"),
231 id=msg_dict.get("status_id"),
232 media_ids=media_id)
233
234 await context.bot.send_message(chat_id=chat_id, text="Done",
235 reply_markup=MAIN_MENU
236 )
237
238
239async def photo(update: Update, context: CallbackContext):
240 """Stores the photo and asks for a location."""
241 global scheduler
242 await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)
243
244 status_id = context.user_data["status_id"]
245 status_content = context.user_data["status_content"]
246
247 message = update.effective_message
248 context.user_data["media"] = []
249 if message.media_group_id:
250 media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id
251 msg_dict = {
252 "media_id": media_id,
253 "caption": message.caption_html,
254 "status_id": status_id,
255 "content": status_content,
256 "chat_id": message.chat_id,
257 }
258 jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id))
259 if jobs:
260 jobs[0].data.append(msg_dict)
261 else:
262 context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict],
263 name=str(message.media_group_id))
264 else:
265 file = await update.message.effective_attachment[-1].get_file()
266 img = io.BytesIO()
267 await file.download_to_memory(img)
268 img.seek(0)
269
270 media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
271 mastodon_client.status_update(
272 status=status_content,
273 id=status_id,
274 media_ids=media["id"])
275
276 await update.message.reply_text(text="Done",
277 reply_markup=MAIN_MENU
278 )
279
280
281async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
282 print(context.user_data)
283 await update.message.reply_text(
284 text="Done.", reply_markup=MAIN_MENU
285 )
286 return ConversationHandler.END
287
288
289def main() -> None:
290 application = Application.builder().token(BOT_TOKEN).build()
291
292 checkin_handler = ConversationHandler(
293 entry_points=[
294 CommandHandler("start", start),
295 MessageHandler(filters.LOCATION, checkin),
296 ],
297 states={
298 LOCATION: [
299 MessageHandler(filters.LOCATION, checkin),
300 ],
301 WAIT_LOC: [CallbackQueryHandler(process_callback)],
302 LOCATION_SEARCH: [
303 MessageHandler(filters.TEXT, location_search_callback),
304 CommandHandler("skip", skip_location_search),
305 ],
306 PHOTO: [MessageHandler(filters.PHOTO, photo),
307 CommandHandler("skip", skip_photo)],
308 },
309 fallbacks=[CommandHandler("cancel", cancel)],
310 per_message=False,
311 allow_reentry=True,
312 )
313
314 setting_conv_handler = ConversationHandler(
315 entry_points=[CommandHandler("setting", setting)],
316 states={
317 SETTING: [
318 CallbackQueryHandler(setting_process_callback),
319 ],
320 },
321 fallbacks=[CommandHandler("back", setting_cancel)],
322 per_message=False,
323 allow_reentry=True,
324 )
325
326 application.add_handler(CommandHandler("tos", tos))
327 application.add_handler(setting_conv_handler, 2)
328 application.add_handler(checkin_handler, 1)
329
330 # Run the bot until the user presses Ctrl-C
331 application.run_polling()
332
333
334if __name__ == "__main__":
335 main()
Powered by cgit v1.2.3 (git 2.41.0)