aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorclarkzjw <[email protected]>2023-02-21 23:32:21 -0800
committerclarkzjw <[email protected]>2023-02-21 23:32:21 -0800
commit3a76618d0f46c7e28641b74a869544661c504e21 (patch)
tree47af0b7b4dfe9ad5ad6f710c6efe9ab99ab35165
parentefc952972f97f98814b2e9a49648766101cd0725 (diff)
downloadswarm2fediverse-3a76618d0f46c7e28641b74a869544661c504e21.tar.gz
refactoring code
-rw-r--r--bot.py329
-rw-r--r--callback.py266
-rw-r--r--command.py33
-rw-r--r--config.py36
4 files changed, 339 insertions, 325 deletions
diff --git a/bot.py b/bot.py
index f238e75..f2be049 100644
--- a/bot.py
+++ b/bot.py
@@ -1,342 +1,21 @@
1#!/usr/bin/env python 1#!/usr/bin/env python
2 2
3import telegram.constants
4from telegram import __version__ as TG_VER
5
6try:
7 from telegram import __version_info__
8except ImportError:
9 __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
10
11if __version_info__ < (20, 0, 0, "alpha", 1):
12 raise RuntimeError(
13 f"This example is not compatible with your current PTB version {TG_VER}. To view the "
14 f"{TG_VER} version of this example, "
15 f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
16 )
17
18import logging 3import logging
19from pprint import pprint 4from callback import *
20import io 5from telegram.ext import Application, CallbackQueryHandler, CommandHandler, MessageHandler, filters, ConversationHandler
21from foursquare.poi import OSM_ENDPOINT
22from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
23from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, \
24 ConversationHandler, CallbackContext
25from config import BOT_TOKEN, MEDIA_GROUP_TIMEOUT
26from foursquare.poi import query_poi
27from dbstore.dbm_store import get_loc
28from toot import mastodon_client
29from typing import TypedDict, List, cast
30from prompt.string import *
31
32 6
33logging.basicConfig( 7logging.basicConfig(
34 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO 8 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
35) 9)
36logger = logging.getLogger(__name__) 10logger = logging.getLogger(__name__)
37 11
38WAIT_LOCATION, LOCATION_SEARCH_KEYWORD, LOCATION_CONFIRMATION, ADD_MEDIA, ADD_COMMENT = range(5)
39
40MAIN_MENU = ReplyKeyboardMarkup([
41 [KeyboardButton(text="Check-in here", request_location=True)],
42])
43
44SKIP_LOCATION_SEARCH = CALLBACK_SKIP
45INLINE_SKIP_MENU = InlineKeyboardMarkup([
46 [telegram.InlineKeyboardButton("Skip", callback_data=SKIP_LOCATION_SEARCH)]
47])
48
49
50class MsgDict(TypedDict):
51 media_id: str
52 caption: str
53 status_id: int
54 content: str
55 chat_id: int
56
57
58async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
59 hello = "Hello, this is `checkin.bot`. \n\n" \
60 "This is a Telegram bot with functionality similar to Foursquare Swarm, " \
61 "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \
62 "Aware of privacy concerns, this bot will not store your location data." \
63 "*Be safe and cautious when sharing your real time location on the web.* \n\n" \
64 "Start using this bot by sharing your location using Telegram context menu to it."
65
66 await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN)
67 await update.message.reply_text(PROMPT_CHOOSE_ACTION, reply_markup=MAIN_MENU)
68
69 return WAIT_LOCATION
70
71
72async def callback_location_sharing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
73 if update.message.venue is not None:
74 fsq_id = update.message.venue.foursquare_id
75 title = update.message.venue.title
76 context.user_data["fsq_id"] = fsq_id
77 context.user_data["title"] = title
78 context.user_data["latitude"] = update.message.venue.location.latitude
79 context.user_data["longitude"] = update.message.venue.location.longitude
80
81 poi = get_loc(context.user_data["fsq_id"])
82 media_id = []
83 content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
84 status = mastodon_client.status_post(
85 content,
86 visibility="private",
87 media_ids=media_id)
88
89 context.user_data["status_id"] = status["id"]
90 context.user_data["status_content"] = content
91
92 print("status_id", context.user_data["status_id"])
93
94 await update.message.reply_text(
95 text=f"Selected place: {poi['name']}, \nPosted to Mastodon: {status['url']}",
96 parse_mode=telegram.constants.ParseMode.MARKDOWN,
97 )
98
99 prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU)
100 context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id
101
102 return ADD_COMMENT
103 else:
104 context.user_data["latitude"] = update.message.location.latitude
105 context.user_data["longitude"] = update.message.location.longitude
106
107 await update.message.reply_text("Searching...", reply_markup=telegram.ReplyKeyboardRemove())
108 prompt_msg = await update.message.reply_text(PROMPT_LOCATION_KEYWORD, reply_markup=INLINE_SKIP_MENU)
109
110 context.user_data[PROMPT_LOCATION_KEYWORD] = prompt_msg.message_id
111 return LOCATION_SEARCH_KEYWORD
112
113
114async def callback_manual_location(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
115 loc = update.effective_message.text
116 osm_url = OSM_ENDPOINT.format(context.user_data["latitude"], context.user_data["longitude"])
117 media_id = []
118 content = f"I'm at {loc}, {osm_url}"
119 status = mastodon_client.status_post(
120 content,
121 visibility="private",
122 media_ids=media_id)
123
124 context.user_data["status_id"] = status["id"]
125 context.user_data["status_content"] = content
126
127 print("status_id", context.user_data["status_id"])
128
129 await update.message.reply_text(
130 text=f"Manually selected place: {loc}, \nPosted to Mastodon: {status['url']}",
131 parse_mode=telegram.constants.ParseMode.MARKDOWN,
132 )
133
134 prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU)
135 context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id
136
137 return ADD_COMMENT
138
139
140async def callback_location_confirmation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
141 query = update.callback_query
142 await query.answer()
143 context.user_data["fsq_id"] = query.data
144
145 await query.delete_message()
146
147 poi = get_loc(context.user_data["fsq_id"])
148 media_id = []
149 content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
150 status = mastodon_client.status_post(
151 content,
152 visibility="private",
153 media_ids=media_id)
154
155 context.user_data["status_id"] = status["id"]
156 context.user_data["status_content"] = content
157
158 print("status_id", context.user_data["status_id"])
159
160 await query.message.reply_text(
161 text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}",
162 parse_mode=telegram.constants.ParseMode.MARKDOWN,
163 )
164
165 prompt_attach_comment_msg = await query.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU)
166 context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id
167
168 return ADD_COMMENT
169
170
171async def callback_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
172 await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_LOCATION_KEYWORD])
173
174 location_search = update.effective_message.text
175 latitude = context.user_data["latitude"]
176 longitude = context.user_data["longitude"]
177
178 keyboard = []
179 poi_result = query_poi(location_search, latitude, longitude)
180 if len(poi_result) == 0:
181 poi_result = query_poi("", latitude, longitude)
182
183 for poi in poi_result:
184 keyboard.append([
185 InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
186 ])
187
188 if len(keyboard) == 0:
189 await update.message.reply_text(PROMPT_NO_NEARBY_POI)
190 return LOCATION_CONFIRMATION
191 else:
192 reply_markup = InlineKeyboardMarkup(keyboard)
193 context.user_data["location_search"] = location_search
194 await update.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup)
195
196 return LOCATION_CONFIRMATION
197
198
199async def callback_skip_location_keyword(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
200 query = update.callback_query
201 await query.answer()
202
203 await query.message.delete()
204 latitude = context.user_data["latitude"]
205 longitude = context.user_data["longitude"]
206
207 keyboard = []
208
209 for poi in query_poi("", latitude, longitude):
210 keyboard.append([
211 InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
212 ])
213
214 reply_markup = InlineKeyboardMarkup(keyboard)
215 await query.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup)
216
217 return LOCATION_CONFIRMATION
218
219
220async def callback_add_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
221 await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT])
222 comment = update.effective_message.text
223
224 mastodon_client.status_update(
225 status=f"{comment} " + context.user_data["status_content"],
226 id=context.user_data["status_id"])
227
228 context.user_data["status_content"] = f"{comment} " + context.user_data["status_content"]
229 prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU)
230 context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id
231
232 return ADD_MEDIA
233
234
235async def callback_skip_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
236 await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT])
237 prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU)
238 context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id
239 return ADD_MEDIA
240
241
242async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
243 await update.message.reply_text(PROMPT_HELP)
244
245
246async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
247 await update.message.reply_text(text=PROMPT_CANCELED, reply_markup=MAIN_MENU)
248
249 return ConversationHandler.END
250
251
252async def process_media_group(context: CallbackContext):
253 context.job.data = cast(List[MsgDict], context.job.data)
254
255 media_id = []
256 chat_id = context.job.data[0].get("chat_id")
257 for msg_dict in context.job.data:
258 if len(media_id) >= 4:
259 await context.bot.send_message(chat_id=chat_id, text=PROMPT_MAX_PHOTO_REACHED, reply_markup=MAIN_MENU)
260 return
261
262 file = await context.bot.get_file(msg_dict.get("media_id"))
263 img = io.BytesIO()
264 await file.download_to_memory(img)
265
266 img.seek(0)
267
268 media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
269 media_id.append(media["id"])
270
271 mastodon_client.status_update(
272 status=msg_dict.get("content"),
273 id=msg_dict.get("status_id"),
274 media_ids=media_id)
275
276 await context.bot.send_message(chat_id=chat_id, text=PROMPT_DONE, reply_markup=MAIN_MENU)
277
278
279async def callback_add_media(update: Update, context: CallbackContext):
280 await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)
281
282 try:
283 await context.bot.delete_message(chat_id=update.message.chat_id,
284 message_id=context.user_data[PROMPT_ADD_MEDIA])
285 except telegram.error.BadRequest as e:
286 if "not found" in str(e.message):
287 pass
288
289 status_id = context.user_data["status_id"]
290 status_content = context.user_data["status_content"]
291
292 message = update.effective_message
293 context.user_data["media"] = []
294 if message.media_group_id:
295 media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id
296 msg_dict = {
297 "media_id": media_id,
298 "caption": message.caption_html,
299 "status_id": status_id,
300 "content": status_content,
301 "chat_id": message.chat_id,
302 }
303 jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id))
304 if jobs:
305 jobs[0].data.append(msg_dict)
306 else:
307 context.job_queue.run_once(callback=process_media_group, when=MEDIA_GROUP_TIMEOUT,
308 data=[msg_dict], name=str(message.media_group_id))
309 else:
310 file = await update.message.effective_attachment[-1].get_file()
311 img = io.BytesIO()
312 await file.download_to_memory(img)
313 img.seek(0)
314
315 media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
316 mastodon_client.status_update(
317 status=status_content,
318 id=status_id,
319 media_ids=media["id"])
320
321 await update.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU)
322
323
324async def callback_skip_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
325 query = update.callback_query
326 await query.answer()
327
328 await query.delete_message()
329 await query.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU)
330
331 return ConversationHandler.END
332
333 12
334def main() -> None: 13def main() -> None:
335 application = Application.builder().token(BOT_TOKEN).build() 14 application = Application.builder().token(BOT_TOKEN).build()
336 15
337 checkin_handler = ConversationHandler( 16 checkin_handler = ConversationHandler(
338 entry_points=[ 17 entry_points=[
339 CommandHandler("start", start), 18 CommandHandler("start", start_command),
340 MessageHandler(filters.LOCATION, callback_location_sharing), 19 MessageHandler(filters.LOCATION, callback_location_sharing),
341 ], 20 ],
342 states={ 21 states={
@@ -358,7 +37,7 @@ def main() -> None:
358 ADD_MEDIA: [MessageHandler(filters.PHOTO, callback_add_media), 37 ADD_MEDIA: [MessageHandler(filters.PHOTO, callback_add_media),
359 CallbackQueryHandler(callback_skip_media)], 38 CallbackQueryHandler(callback_skip_media)],
360 }, 39 },
361 fallbacks=[CommandHandler("cancel", cancel)], 40 fallbacks=[CommandHandler("cancel", cancel_command)],
362 per_message=False, 41 per_message=False,
363 allow_reentry=True, 42 allow_reentry=True,
364 ) 43 )
diff --git a/callback.py b/callback.py
new file mode 100644
index 0000000..8c80028
--- /dev/null
+++ b/callback.py
@@ -0,0 +1,266 @@
1import io
2from foursquare.poi import OSM_ENDPOINT
3from foursquare.poi import query_poi
4from dbstore.dbm_store import get_loc
5from toot import mastodon_client
6from command import *
7from telegram import __version__ as TG_VER
8from typing import cast, List
9from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
10from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, \
11 ConversationHandler, CallbackContext
12from telegram.constants import ParseMode, ChatAction
13from telegram.error import BadRequest
14from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardRemove
15
16
17async def callback_skip_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
18 query = update.callback_query
19 await query.answer()
20
21 await query.delete_message()
22 await query.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU)
23
24 return ConversationHandler.END
25
26
27async def callback_location_sharing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
28 if update.message.venue is not None:
29 fsq_id = update.message.venue.foursquare_id
30 title = update.message.venue.title
31 context.user_data["fsq_id"] = fsq_id
32 context.user_data["title"] = title
33 context.user_data["latitude"] = update.message.venue.location.latitude
34 context.user_data["longitude"] = update.message.venue.location.longitude
35
36 poi = get_loc(context.user_data["fsq_id"])
37 media_id = []
38 content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
39 status = mastodon_client.status_post(
40 content,
41 visibility="private",
42 media_ids=media_id)
43
44 context.user_data["status_id"] = status["id"]
45 context.user_data["status_content"] = content
46
47 print("status_id", context.user_data["status_id"])
48
49 await update.message.reply_text(
50 text=f"Selected place: {poi['name']}, \nPosted to Mastodon: {status['url']}",
51 parse_mode=ParseMode.MARKDOWN,
52 )
53
54 prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU)
55 context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id
56
57 return ADD_COMMENT
58 else:
59 context.user_data["latitude"] = update.message.location.latitude
60 context.user_data["longitude"] = update.message.location.longitude
61
62 await update.message.reply_text("Searching...", reply_markup=ReplyKeyboardRemove())
63 prompt_msg = await update.message.reply_text(PROMPT_LOCATION_KEYWORD, reply_markup=INLINE_SKIP_MENU)
64
65 context.user_data[PROMPT_LOCATION_KEYWORD] = prompt_msg.message_id
66 return LOCATION_SEARCH_KEYWORD
67
68
69async def callback_manual_location(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
70 loc = update.effective_message.text
71 osm_url = OSM_ENDPOINT.format(context.user_data["latitude"], context.user_data["longitude"])
72 media_id = []
73 content = f"I'm at {loc}, {osm_url}"
74 status = mastodon_client.status_post(
75 content,
76 visibility="private",
77 media_ids=media_id)
78
79 context.user_data["status_id"] = status["id"]
80 context.user_data["status_content"] = content
81
82 print("status_id", context.user_data["status_id"])
83
84 await update.message.reply_text(
85 text=f"Manually selected place: {loc}, \nPosted to Mastodon: {status['url']}",
86 parse_mode=ParseMode.MARKDOWN,
87 )
88
89 prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU)
90 context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id
91
92 return ADD_COMMENT
93
94
95async def callback_location_confirmation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
96 query = update.callback_query
97 await query.answer()
98 context.user_data["fsq_id"] = query.data
99
100 await query.delete_message()
101
102 poi = get_loc(context.user_data["fsq_id"])
103 media_id = []
104 content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
105 status = mastodon_client.status_post(
106 content,
107 visibility="private",
108 media_ids=media_id)
109
110 context.user_data["status_id"] = status["id"]
111 context.user_data["status_content"] = content
112
113 print("status_id", context.user_data["status_id"])
114
115 await query.message.reply_text(
116 text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}",
117 parse_mode=ParseMode.MARKDOWN,
118 )
119
120 prompt_attach_comment_msg = await query.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU)
121 context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id
122
123 return ADD_COMMENT
124
125
126async def callback_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
127 await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_LOCATION_KEYWORD])
128
129 location_search = update.effective_message.text
130 latitude = context.user_data["latitude"]
131 longitude = context.user_data["longitude"]
132
133 keyboard = []
134 poi_result = query_poi(location_search, latitude, longitude)
135 if len(poi_result) == 0:
136 poi_result = query_poi("", latitude, longitude)
137
138 for poi in poi_result:
139 keyboard.append([
140 InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
141 ])
142
143 if len(keyboard) == 0:
144 await update.message.reply_text(PROMPT_NO_NEARBY_POI)
145 return LOCATION_CONFIRMATION
146 else:
147 reply_markup = InlineKeyboardMarkup(keyboard)
148 context.user_data["location_search"] = location_search
149 await update.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup)
150
151 return LOCATION_CONFIRMATION
152
153
154async def callback_skip_location_keyword(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
155 query = update.callback_query
156 await query.answer()
157
158 await query.message.delete()
159 latitude = context.user_data["latitude"]
160 longitude = context.user_data["longitude"]
161
162 keyboard = []
163
164 for poi in query_poi("", latitude, longitude):
165 keyboard.append([
166 InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
167 ])
168
169 reply_markup = InlineKeyboardMarkup(keyboard)
170 await query.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup)
171
172 return LOCATION_CONFIRMATION
173
174
175async def callback_add_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
176 await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT])
177 comment = update.effective_message.text
178
179 mastodon_client.status_update(
180 status=f"{comment} " + context.user_data["status_content"],
181 id=context.user_data["status_id"])
182
183 context.user_data["status_content"] = f"{comment} " + context.user_data["status_content"]
184 prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU)
185 context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id
186
187 return ADD_MEDIA
188
189
190async def callback_skip_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
191 await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT])
192 prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU)
193 context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id
194 return ADD_MEDIA
195
196
197async def process_media_group(context: CallbackContext):
198 context.job.data = cast(List[MsgDict], context.job.data)
199
200 media_id = []
201 chat_id = context.job.data[0].get("chat_id")
202 for msg_dict in context.job.data:
203 if len(media_id) >= 4:
204 await context.bot.send_message(chat_id=chat_id, text=PROMPT_MAX_PHOTO_REACHED, reply_markup=MAIN_MENU)
205 return
206
207 file = await context.bot.get_file(msg_dict.get("media_id"))
208 img = io.BytesIO()
209 await file.download_to_memory(img)
210
211 img.seek(0)
212
213 media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
214 media_id.append(media["id"])
215
216 mastodon_client.status_update(
217 status=msg_dict.get("content"),
218 id=msg_dict.get("status_id"),
219 media_ids=media_id)
220
221 await context.bot.send_message(chat_id=chat_id, text=PROMPT_DONE, reply_markup=MAIN_MENU)
222
223
224async def callback_add_media(update: Update, context: CallbackContext):
225 await update.message.reply_chat_action(ChatAction.TYPING)
226
227 try:
228 await context.bot.delete_message(chat_id=update.message.chat_id,
229 message_id=context.user_data[PROMPT_ADD_MEDIA])
230 except BadRequest as e:
231 if "not found" in str(e.message):
232 pass
233
234 status_id = context.user_data["status_id"]
235 status_content = context.user_data["status_content"]
236
237 message = update.effective_message
238 context.user_data["media"] = []
239 if message.media_group_id:
240 media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id
241 msg_dict = {
242 "media_id": media_id,
243 "caption": message.caption_html,
244 "status_id": status_id,
245 "content": status_content,
246 "chat_id": message.chat_id,
247 }
248 jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id))
249 if jobs:
250 jobs[0].data.append(msg_dict)
251 else:
252 context.job_queue.run_once(callback=process_media_group, when=MEDIA_GROUP_TIMEOUT,
253 data=[msg_dict], name=str(message.media_group_id))
254 else:
255 file = await update.message.effective_attachment[-1].get_file()
256 img = io.BytesIO()
257 await file.download_to_memory(img)
258 img.seek(0)
259
260 media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
261 mastodon_client.status_update(
262 status=status_content,
263 id=status_id,
264 media_ids=media["id"])
265
266 await update.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU)
diff --git a/command.py b/command.py
new file mode 100644
index 0000000..753b0a2
--- /dev/null
+++ b/command.py
@@ -0,0 +1,33 @@
1from config import *
2
3from telegram import __version__ as TG_VER
4from typing import cast, List
5from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
6from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, \
7 ConversationHandler, CallbackContext
8from telegram.constants import ParseMode, ChatAction
9from telegram.error import BadRequest
10from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardRemove
11
12
13async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
14 hello = "Hello, this is `checkin.bot`. \n\n" \
15 "This is a Telegram bot with functionality similar to Foursquare Swarm, " \
16 "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \
17 "Aware of privacy concerns, this bot will not store your location data." \
18 "*Be safe and cautious when sharing your real time location on the web.* \n\n" \
19 "Start using this bot by sharing your location using Telegram context menu to it."
20
21 await update.message.reply_text(hello, parse_mode=ParseMode.MARKDOWN)
22 await update.message.reply_text(PROMPT_CHOOSE_ACTION, reply_markup=MAIN_MENU)
23
24 return WAIT_LOCATION
25
26
27async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
28 await update.message.reply_text(PROMPT_HELP)
29
30
31async def cancel_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
32 await update.message.reply_text(text=PROMPT_CANCELED, reply_markup=MAIN_MENU)
33 return ConversationHandler.END
diff --git a/config.py b/config.py
index 3356d11..b690401 100644
--- a/config.py
+++ b/config.py
@@ -1,5 +1,22 @@
1# https://docs.python.org/3/library/configparser.html 1# https://docs.python.org/3/library/configparser.html
2 2
3from telegram import __version__ as TG_VER
4
5try:
6 from telegram import __version_info__
7except ImportError:
8 __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
9
10if __version_info__ < (20, 0, 0, "alpha", 1):
11 raise RuntimeError(
12 f"This example is not compatible with your current PTB version {TG_VER}. To view the "
13 f"{TG_VER} version of this example, "
14 f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
15 )
16
17from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup, KeyboardButton
18from prompt.string import *
19from typing import TypedDict
3import configparser 20import configparser
4 21
5config = configparser.ConfigParser() 22config = configparser.ConfigParser()
@@ -14,3 +31,22 @@ TOOT_CLIENT_SECRET = config["TOOT"]["CLIENT_SECRET"]
14TOOT_ACCESS_TOKEN = config["TOOT"]["ACCESS_TOKEN"] 31TOOT_ACCESS_TOKEN = config["TOOT"]["ACCESS_TOKEN"]
15 32
16MEDIA_GROUP_TIMEOUT = 3 33MEDIA_GROUP_TIMEOUT = 3
34
35WAIT_LOCATION, LOCATION_SEARCH_KEYWORD, LOCATION_CONFIRMATION, ADD_MEDIA, ADD_COMMENT = range(5)
36
37MAIN_MENU = ReplyKeyboardMarkup([
38 [KeyboardButton(text="Check-in here", request_location=True)],
39])
40
41SKIP_LOCATION_SEARCH = CALLBACK_SKIP
42INLINE_SKIP_MENU = InlineKeyboardMarkup([
43 [InlineKeyboardButton("Skip", callback_data=SKIP_LOCATION_SEARCH)]
44])
45
46
47class MsgDict(TypedDict):
48 media_id: str
49 caption: str
50 status_id: int
51 content: str
52 chat_id: int
Powered by cgit v1.2.3 (git 2.41.0)