aboutsummaryrefslogtreecommitdiff
path: root/bot.py
diff options
context:
space:
mode:
authorclarkzjw <[email protected]>2023-02-21 16:46:11 -0800
committerclarkzjw <[email protected]>2023-02-21 16:46:11 -0800
commit22fb6a357f1b6b91f04a180604e2f992280ee661 (patch)
tree0ed80d12ac41475138077891ffea0bb78fe392cb /bot.py
parent0ce8037dcb01745edc9930b09e72bf7ea4dfcf4e (diff)
downloadswarm2fediverse-22fb6a357f1b6b91f04a180604e2f992280ee661.tar.gz
add django app templatefeature/django
Diffstat (limited to 'bot.py')
-rw-r--r--bot.py314
1 files changed, 314 insertions, 0 deletions
diff --git a/bot.py b/bot.py
new file mode 100644
index 0000000..c2e102d
--- /dev/null
+++ b/bot.py
@@ -0,0 +1,314 @@
1#!/usr/bin/env python
2# pylint: disable=unused-argument, wrong-import-position
3# This program is dedicated to the public domain under the CC0 license.
4
5import logging
6import io
7import telegram.constants
8from telegram import __version__ as TG_VER
9
10try:
11 from telegram import __version_info__
12except ImportError:
13 __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
14
15if __version_info__ < (20, 0, 0, "alpha", 1):
16 raise RuntimeError(
17 f"This example is not compatible with your current PTB version {TG_VER}. To view the "
18 f"{TG_VER} version of this example, "
19 f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
20 )
21
22from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
23from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext
24from ..config import BOT_TOKEN
25from foursquare.poi import query_poi
26from dbstore.dbm_store import get_loc
27from toot import mastodon_client
28from typing import TypedDict, List, cast
29
30scheduler = None
31PRIVACY, TOOT = map(chr, range(8, 10))
32
33WAIT_LOC, LOCATION, PHOTO, PROCESS_PHOTO, FINAL, SETTING = range(6)
34
35# Enable logging
36logging.basicConfig(
37 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
38)
39logger = logging.getLogger(__name__)
40
41MAIN_MENU = ReplyKeyboardMarkup([
42 [telegram.KeyboardButton(text="/check in", request_location=True)],
43 [telegram.KeyboardButton(text="/cancel")],
44 [telegram.KeyboardButton(text="/setting")]
45])
46
47SKIP_MENU = ReplyKeyboardMarkup([[telegram.KeyboardButton(text="/skip")]])
48SETTING_MENU = ReplyKeyboardMarkup(
49 [
50 [KeyboardButton(text="/tos")],
51 [telegram.KeyboardButton(text="/back")],
52 ]
53)
54
55
56async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
57 hello = "Hello, this is `checkin.bot`. \n\n" \
58 "This is a Telegram bot with functionality similar to Foursquare Swarm, " \
59 "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \
60 "Aware of privacy concerns, this bot will not store your location data." \
61 "*Be safe and cautious when sharing your real time location on the web.* \n\n" \
62 "Start using this bot by sharing your location using Telegram context menu to it."
63
64 await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN)
65 await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU)
66
67 return LOCATION
68
69
70async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
71 keyboard = []
72
73 for poi in query_poi(update.message.location.latitude, update.message.location.longitude):
74 keyboard.append([
75 InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
76 ])
77
78 reply_markup = InlineKeyboardMarkup(keyboard)
79 await update.message.reply_text("Where are you?", reply_markup=reply_markup)
80
81 return WAIT_LOC
82
83
84async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
85 query = update.callback_query
86 await query.answer()
87 print(query.data)
88 context.user_data["fsq_id"] = query.data
89 await query.delete_message()
90
91 poi = get_loc(context.user_data["fsq_id"])
92 media_id = []
93 content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
94 status = mastodon_client.status_post(
95 content,
96 visibility="private",
97 media_ids=media_id)
98
99 context.user_data["status_id"] = status["id"]
100 context.user_data["status_content"] = content
101
102 print("status_id", context.user_data["status_id"])
103
104 await query.message.reply_text(
105 text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}",
106 parse_mode=telegram.constants.ParseMode.MARKDOWN,
107 reply_markup=MAIN_MENU
108 )
109
110 await query.message.reply_text("You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU)
111 return PHOTO
112
113
114async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
115 await update.message.reply_text("TOS", reply_markup=MAIN_MENU)
116
117
118async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
119 await update.message.reply_text("Setting", reply_markup=SETTING_MENU)
120 return SETTING
121
122
123async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
124 await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU)
125 return ConversationHandler.END
126
127
128async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE):
129 await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)
130
131 fsq_id = context.user_data["fsq_id"]
132 poi = get_loc(context.user_data["fsq_id"])
133 media_id = []
134
135 if context.user_data.get("photo") is not None:
136 media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg")
137 media_id = [media["id"]]
138 # else:
139 # photo_url = get_poi_top_photo(context.user_data["fsq_id"])
140 # if photo_url is not None:
141 # with urllib.request.urlopen(photo_url) as response:
142 # data = response.read()
143 # media = mastodon_client.media_post(data, mime_type="image/jpeg")
144 # media_id = [media["id"]]
145
146 mastodon_client.status_post(
147 f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}",
148 visibility="private",
149 media_ids=media_id)
150
151 await update.message.delete_message()
152 return ConversationHandler.END
153
154
155async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
156 """Displays info on how to use the bot."""
157 await update.message.reply_text("Use /start to test this bot.")
158
159
160async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
161 """Cancels and ends the conversation."""
162 user = update.message.from_user
163 logger.info("User %s canceled the conversation.", user.first_name)
164 await update.message.reply_text(
165 text="Setting canceled.",
166 # "Bye! I hope we can talk again some day.",
167 reply_markup=MAIN_MENU
168 )
169
170 return ConversationHandler.END
171
172
173async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
174 """Cancels and ends the conversation."""
175 user = update.message.from_user
176 logger.info("User %s canceled the conversation.", user.first_name)
177 await update.message.reply_text(
178 text="Canceled.",
179 # "Bye! I hope we can talk again some day.",
180 reply_markup=MAIN_MENU
181 )
182
183 return ConversationHandler.END
184
185
186class MsgDict(TypedDict):
187 media_id: str
188 caption: str
189 status_id: int
190 content: str
191 chat_id: int
192
193
194async def media_group_sender(context: CallbackContext):
195 context.job.data = cast(List[MsgDict], context.job.data)
196
197 media_id = []
198 chat_id = context.job.data[0].get("chat_id")
199 for msg_dict in context.job.data:
200 if len(media_id) >= 4:
201 print("Cannot attach more than 4 photos")
202 break
203 file = await context.bot.get_file(msg_dict.get("media_id"))
204 img = io.BytesIO()
205 await file.download_to_memory(img)
206
207 img.seek(0)
208
209 media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
210 media_id.append(media["id"])
211
212 mastodon_client.status_update(
213 status=msg_dict.get("content"),
214 id=msg_dict.get("status_id"),
215 media_ids=media_id)
216
217 await context.bot.send_message(chat_id=chat_id, text="Done",
218 reply_markup=MAIN_MENU
219 )
220
221
222async def photo(update: Update, context: CallbackContext):
223 """Stores the photo and asks for a location."""
224 global scheduler
225 await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)
226
227 status_id = context.user_data["status_id"]
228 status_content = context.user_data["status_content"]
229
230 message = update.effective_message
231 context.user_data["media"] = []
232 if message.media_group_id:
233 media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id
234 msg_dict = {
235 "media_id": media_id,
236 "caption": message.caption_html,
237 "status_id": status_id,
238 "content": status_content,
239 "chat_id": message.chat_id,
240 }
241 jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id))
242 if jobs:
243 jobs[0].data.append(msg_dict)
244 else:
245 context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict],
246 name=str(message.media_group_id))
247 else:
248 file = await update.message.effective_attachment[-1].get_file()
249 img = io.BytesIO()
250 await file.download_to_memory(img)
251 img.seek(0)
252
253 media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
254 mastodon_client.status_update(
255 status=status_content,
256 id=status_id,
257 media_ids=media["id"])
258
259 await update.message.reply_text(text="Done",
260 reply_markup=MAIN_MENU
261 )
262
263
264async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
265 print(context.user_data)
266 await update.message.reply_text(
267 text="Done.", reply_markup=MAIN_MENU
268 )
269 return ConversationHandler.END
270
271
272def main() -> None:
273 application = Application.builder().token(BOT_TOKEN).build()
274
275 checkin_handler = ConversationHandler(
276 entry_points=[
277 CommandHandler("start", start),
278 MessageHandler(filters.LOCATION, checkin),
279 ],
280 states={
281 LOCATION: [
282 MessageHandler(filters.LOCATION, checkin),
283 ],
284 WAIT_LOC: [CallbackQueryHandler(process_callback)],
285 PHOTO: [MessageHandler(filters.PHOTO, photo),
286 CommandHandler("skip", skip_photo)],
287 },
288 fallbacks=[CommandHandler("cancel", cancel)],
289 per_message=False,
290 allow_reentry=True,
291 )
292
293 setting_conv_handler = ConversationHandler(
294 entry_points=[CommandHandler("setting", setting)],
295 states={
296 SETTING: [
297 CallbackQueryHandler(setting_process_callback),
298 ],
299 },
300 fallbacks=[CommandHandler("back", setting_cancel)],
301 per_message=False,
302 allow_reentry=True,
303 )
304
305 application.add_handler(CommandHandler("tos", tos))
306 application.add_handler(setting_conv_handler, 2)
307 application.add_handler(checkin_handler, 1)
308
309 # Run the bot until the user presses Ctrl-C
310 application.run_polling()
311
312
313if __name__ == "__main__":
314 main()
Powered by cgit v1.2.3 (git 2.41.0)