aboutsummaryrefslogtreecommitdiff
path: root/bot.py
diff options
context:
space:
mode:
authorclarkzjw <[email protected]>2023-02-21 13:00:07 -0800
committerclarkzjw <[email protected]>2023-02-21 13:00:07 -0800
commit431e8abcc4bf858e4ca945384a2f60702f1b5a4c (patch)
treea586f47e4a2d150a6961c6c914d0789fcf446d10 /bot.py
parent6602a954788e26be63c00369f6bf72822752c84b (diff)
downloadswarm2fediverse-431e8abcc4bf858e4ca945384a2f60702f1b5a4c.tar.gz
make bot as a module
Diffstat (limited to 'bot.py')
-rw-r--r--bot.py315
1 files changed, 0 insertions, 315 deletions
diff --git a/bot.py b/bot.py
deleted file mode 100644
index 5c90144..0000000
--- a/bot.py
+++ /dev/null
@@ -1,315 +0,0 @@
1#!/usr/bin/env python
2# pylint: disable=unused-argument, wrong-import-position
3# This program is dedicated to the public domain under the CC0 license.
4
5import logging
6import io
7import telegram.constants
8from telegram import __version__ as TG_VER
9
10try:
11 from telegram import __version_info__
12except ImportError:
13 __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
14
15if __version_info__ < (20, 0, 0, "alpha", 1):
16 raise RuntimeError(
17 f"This example is not compatible with your current PTB version {TG_VER}. To view the "
18 f"{TG_VER} version of this example, "
19 f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
20 )
21from telegram import InlineKeyboardButton, InlineKeyboardMarkup
22from telegram.ext import Application, CallbackQueryHandler, \
23 CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext
24from config import BOT_TOKEN
25from foursquare.poi import query_poi
26from dbstore.dbm_store import get_loc
27from toot import mastodon_client
28from typing import TypedDict, List, cast
29from telegram import Update, ReplyKeyboardMarkup, KeyboardButton
30
31scheduler = None
32PRIVACY, TOOT = map(chr, range(8, 10))
33
34WAIT_LOC, LOCATION, PHOTO, PROCESS_PHOTO, FINAL, SETTING = range(6)
35
36# Enable logging
37logging.basicConfig(
38 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
39)
40logger = logging.getLogger(__name__)
41
42MAIN_MENU = ReplyKeyboardMarkup([
43 [telegram.KeyboardButton(text="/check in", request_location=True)],
44 [telegram.KeyboardButton(text="/cancel")],
45 [telegram.KeyboardButton(text="/setting")]
46])
47
48SKIP_MENU = ReplyKeyboardMarkup([[telegram.KeyboardButton(text="/skip")]])
49SETTING_MENU = ReplyKeyboardMarkup(
50 [
51 [KeyboardButton(text="/tos")],
52 [telegram.KeyboardButton(text="/back")],
53 ]
54)
55
56
57async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
58 hello = "Hello, this is `checkin.bot`. \n\n" \
59 "This is a Telegram bot with functionality similar to Foursquare Swarm, " \
60 "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \
61 "Aware of privacy concerns, this bot will not store your location data." \
62 "*Be safe and cautious when sharing your real time location on the web.* \n\n" \
63 "Start using this bot by sharing your location using Telegram context menu to it."
64
65 await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN)
66 await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU)
67
68 return LOCATION
69
70
71async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
72 keyboard = []
73
74 for poi in query_poi(update.message.location.latitude, update.message.location.longitude):
75 keyboard.append([
76 InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
77 ])
78
79 reply_markup = InlineKeyboardMarkup(keyboard)
80 await update.message.reply_text("Where are you?", reply_markup=reply_markup)
81
82 return WAIT_LOC
83
84
85async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
86 query = update.callback_query
87 await query.answer()
88 print(query.data)
89 context.user_data["fsq_id"] = query.data
90 await query.delete_message()
91
92 poi = get_loc(context.user_data["fsq_id"])
93 media_id = []
94 content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
95 status = mastodon_client.status_post(
96 content,
97 visibility="private",
98 media_ids=media_id)
99
100 context.user_data["status_id"] = status["id"]
101 context.user_data["status_content"] = content
102
103 print("status_id", context.user_data["status_id"])
104
105 await query.message.reply_text(
106 text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}",
107 parse_mode=telegram.constants.ParseMode.MARKDOWN,
108 reply_markup=MAIN_MENU
109 )
110
111 await query.message.reply_text("You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU)
112 return PHOTO
113
114
115async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
116 await update.message.reply_text("TOS", reply_markup=MAIN_MENU)
117
118
119async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
120 await update.message.reply_text("Setting", reply_markup=SETTING_MENU)
121 return SETTING
122
123
124async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
125 await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU)
126 return ConversationHandler.END
127
128
129async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE):
130 await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)
131
132 fsq_id = context.user_data["fsq_id"]
133 poi = get_loc(context.user_data["fsq_id"])
134 media_id = []
135
136 if context.user_data.get("photo") is not None:
137 media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg")
138 media_id = [media["id"]]
139 # else:
140 # photo_url = get_poi_top_photo(context.user_data["fsq_id"])
141 # if photo_url is not None:
142 # with urllib.request.urlopen(photo_url) as response:
143 # data = response.read()
144 # media = mastodon_client.media_post(data, mime_type="image/jpeg")
145 # media_id = [media["id"]]
146
147 mastodon_client.status_post(
148 f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}",
149 visibility="private",
150 media_ids=media_id)
151
152 await update.message.delete_message()
153 return ConversationHandler.END
154
155
156async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
157 """Displays info on how to use the bot."""
158 await update.message.reply_text("Use /start to test this bot.")
159
160
161async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
162 """Cancels and ends the conversation."""
163 user = update.message.from_user
164 logger.info("User %s canceled the conversation.", user.first_name)
165 await update.message.reply_text(
166 text="Setting canceled.",
167 # "Bye! I hope we can talk again some day.",
168 reply_markup=MAIN_MENU
169 )
170
171 return ConversationHandler.END
172
173
174async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
175 """Cancels and ends the conversation."""
176 user = update.message.from_user
177 logger.info("User %s canceled the conversation.", user.first_name)
178 await update.message.reply_text(
179 text="Canceled.",
180 # "Bye! I hope we can talk again some day.",
181 reply_markup=MAIN_MENU
182 )
183
184 return ConversationHandler.END
185
186
187class MsgDict(TypedDict):
188 media_id: str
189 caption: str
190 status_id: int
191 content: str
192 chat_id: int
193
194
195async def media_group_sender(context: CallbackContext):
196 context.job.data = cast(List[MsgDict], context.job.data)
197
198 media_id = []
199 chat_id = context.job.data[0].get("chat_id")
200 for msg_dict in context.job.data:
201 if len(media_id) >= 4:
202 print("Cannot attach more than 4 photos")
203 break
204 file = await context.bot.get_file(msg_dict.get("media_id"))
205 img = io.BytesIO()
206 await file.download_to_memory(img)
207
208 img.seek(0)
209
210 media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
211 media_id.append(media["id"])
212
213 mastodon_client.status_update(
214 status=msg_dict.get("content"),
215 id=msg_dict.get("status_id"),
216 media_ids=media_id)
217
218 await context.bot.send_message(chat_id=chat_id, text="Done",
219 reply_markup=MAIN_MENU
220 )
221
222
223async def photo(update: Update, context: CallbackContext):
224 """Stores the photo and asks for a location."""
225 global scheduler
226 await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)
227
228 status_id = context.user_data["status_id"]
229 status_content = context.user_data["status_content"]
230
231 message = update.effective_message
232 context.user_data["media"] = []
233 if message.media_group_id:
234 media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id
235 msg_dict = {
236 "media_id": media_id,
237 "caption": message.caption_html,
238 "status_id": status_id,
239 "content": status_content,
240 "chat_id": message.chat_id,
241 }
242 jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id))
243 if jobs:
244 jobs[0].data.append(msg_dict)
245 else:
246 context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict],
247 name=str(message.media_group_id))
248 else:
249 file = await update.message.effective_attachment[-1].get_file()
250 img = io.BytesIO()
251 await file.download_to_memory(img)
252 img.seek(0)
253
254 media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
255 mastodon_client.status_update(
256 status=status_content,
257 id=status_id,
258 media_ids=media["id"])
259
260 await update.message.reply_text(text="Done",
261 reply_markup=MAIN_MENU
262 )
263
264
265async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
266 print(context.user_data)
267 await update.message.reply_text(
268 text="Done.", reply_markup=MAIN_MENU
269 )
270 return ConversationHandler.END
271
272
273def main() -> None:
274 application = Application.builder().token(BOT_TOKEN).build()
275
276 checkin_handler = ConversationHandler(
277 entry_points=[
278 CommandHandler("start", start),
279 MessageHandler(filters.LOCATION, checkin),
280 ],
281 states={
282 LOCATION: [
283 MessageHandler(filters.LOCATION, checkin),
284 ],
285 WAIT_LOC: [CallbackQueryHandler(process_callback)],
286 PHOTO: [MessageHandler(filters.PHOTO, photo),
287 CommandHandler("skip", skip_photo)],
288 },
289 fallbacks=[CommandHandler("cancel", cancel)],
290 per_message=False,
291 allow_reentry=True,
292 )
293
294 setting_conv_handler = ConversationHandler(
295 entry_points=[CommandHandler("setting", setting)],
296 states={
297 SETTING: [
298 CallbackQueryHandler(setting_process_callback),
299 ],
300 },
301 fallbacks=[CommandHandler("back", setting_cancel)],
302 per_message=False,
303 allow_reentry=True,
304 )
305
306 application.add_handler(CommandHandler("tos", tos))
307 application.add_handler(setting_conv_handler, 2)
308 application.add_handler(checkin_handler, 1)
309
310 # Run the bot until the user presses Ctrl-C
311 application.run_polling()
312
313
314if __name__ == "__main__":
315 main()
Powered by cgit v1.2.3 (git 2.41.0)