From 5b09e9d190b642a4e5571c0e4504c155f56a6f5a Mon Sep 17 00:00:00 2001 From: clarkzjw Date: Tue, 21 Feb 2023 17:39:59 -0800 Subject: bot: enable location search --- bot.py | 335 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ bot/__init__.py | 13 --- bot/bot.py | 300 ------------------------------------------------ foursquare/poi.py | 14 ++- 4 files changed, 343 insertions(+), 319 deletions(-) create mode 100644 bot.py delete mode 100644 bot/__init__.py delete mode 100644 bot/bot.py diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..efde56f --- /dev/null +++ b/bot.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python +# pylint: disable=unused-argument, wrong-import-position +# This program is dedicated to the public domain under the CC0 license. + +import logging +import io +import telegram.constants +from telegram import __version__ as TG_VER + +try: + from telegram import __version_info__ +except ImportError: + __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment] + +if __version_info__ < (20, 0, 0, "alpha", 1): + raise RuntimeError( + f"This example is not compatible with your current PTB version {TG_VER}. To view the " + f"{TG_VER} version of this example, " + f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" + ) + +from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton +from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext +from config import BOT_TOKEN +from foursquare.poi import query_poi +from dbstore.dbm_store import get_loc +from toot import mastodon_client +from typing import TypedDict, List, cast + +scheduler = None +PRIVACY, TOOT = map(chr, range(8, 10)) + +WAIT_LOC, LOCATION, LOCATION_SEARCH, PHOTO, PROCESS_PHOTO, FINAL, SETTING = range(7) + +# Enable logging +logging.basicConfig( + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO +) +logger = logging.getLogger(__name__) + +MAIN_MENU = ReplyKeyboardMarkup([ + [telegram.KeyboardButton(text="/check in", request_location=True)], + [telegram.KeyboardButton(text="/cancel")], + [telegram.KeyboardButton(text="/setting")] +]) + +SKIP_MENU = ReplyKeyboardMarkup([[telegram.KeyboardButton(text="/skip")]]) +SETTING_MENU = ReplyKeyboardMarkup( + [ + [KeyboardButton(text="/tos")], + [telegram.KeyboardButton(text="/back")], + ] +) + + +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + hello = "Hello, this is `checkin.bot`. \n\n" \ + "This is a Telegram bot with functionality similar to Foursquare Swarm, " \ + "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \ + "Aware of privacy concerns, this bot will not store your location data." \ + "*Be safe and cautious when sharing your real time location on the web.* \n\n" \ + "Start using this bot by sharing your location using Telegram context menu to it." + + await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) + await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU) + + return LOCATION + + +async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + context.user_data["latitude"] = update.message.location.latitude + context.user_data["longitude"] = update.message.location.longitude + await update.message.reply_text("You can input location search keywords or press skip", reply_markup=SKIP_MENU) + + return LOCATION_SEARCH + + +async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query = update.callback_query + await query.answer() + print(query.data) + context.user_data["fsq_id"] = query.data + await query.delete_message() + + poi = get_loc(context.user_data["fsq_id"]) + media_id = [] + content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" + status = mastodon_client.status_post( + content, + visibility="private", + media_ids=media_id) + + context.user_data["status_id"] = status["id"] + context.user_data["status_content"] = content + + print("status_id", context.user_data["status_id"]) + + await query.message.reply_text( + text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", + parse_mode=telegram.constants.ParseMode.MARKDOWN, + reply_markup=MAIN_MENU + ) + + await query.message.reply_text("You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU) + return PHOTO + + +async def location_search_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + location_search = update.effective_message.text + latitude = context.user_data["latitude"] + longitude = context.user_data["longitude"] + + keyboard = [] + + for poi in query_poi(location_search, latitude, longitude): + keyboard.append([ + InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), + ]) + + reply_markup = InlineKeyboardMarkup(keyboard) + context.user_data["location_search"] = location_search + await update.message.reply_text("Where are you? ", reply_markup=reply_markup) + + return WAIT_LOC + + +async def skip_location_search(update: Update, context: ContextTypes.DEFAULT_TYPE): + return WAIT_LOC + + +async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + await update.message.reply_text("TOS", reply_markup=MAIN_MENU) + + +async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + await update.message.reply_text("Setting", reply_markup=SETTING_MENU) + return SETTING + + +async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): + await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU) + return ConversationHandler.END + + +async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE): + await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) + + fsq_id = context.user_data["fsq_id"] + poi = get_loc(context.user_data["fsq_id"]) + media_id = [] + + if context.user_data.get("photo") is not None: + media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg") + media_id = [media["id"]] + # else: + # photo_url = get_poi_top_photo(context.user_data["fsq_id"]) + # if photo_url is not None: + # with urllib.request.urlopen(photo_url) as response: + # data = response.read() + # media = mastodon_client.media_post(data, mime_type="image/jpeg") + # media_id = [media["id"]] + + mastodon_client.status_post( + f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", + visibility="private", + media_ids=media_id) + + await update.message.delete_message() + return ConversationHandler.END + + +async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Displays info on how to use the bot.""" + await update.message.reply_text("Use /start to test this bot.") + + +async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Cancels and ends the conversation.""" + user = update.message.from_user + logger.info("User %s canceled the conversation.", user.first_name) + await update.message.reply_text( + text="Setting canceled.", + # "Bye! I hope we can talk again some day.", + reply_markup=MAIN_MENU + ) + + return ConversationHandler.END + + +async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Cancels and ends the conversation.""" + user = update.message.from_user + logger.info("User %s canceled the conversation.", user.first_name) + await update.message.reply_text( + text="Canceled.", + # "Bye! I hope we can talk again some day.", + reply_markup=MAIN_MENU + ) + + return ConversationHandler.END + + +class MsgDict(TypedDict): + media_id: str + caption: str + status_id: int + content: str + chat_id: int + + +async def media_group_sender(context: CallbackContext): + context.job.data = cast(List[MsgDict], context.job.data) + + media_id = [] + chat_id = context.job.data[0].get("chat_id") + for msg_dict in context.job.data: + if len(media_id) >= 4: + print("Cannot attach more than 4 photos") + break + file = await context.bot.get_file(msg_dict.get("media_id")) + img = io.BytesIO() + await file.download_to_memory(img) + + img.seek(0) + + media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") + media_id.append(media["id"]) + + mastodon_client.status_update( + status=msg_dict.get("content"), + id=msg_dict.get("status_id"), + media_ids=media_id) + + await context.bot.send_message(chat_id=chat_id, text="Done", + reply_markup=MAIN_MENU + ) + + +async def photo(update: Update, context: CallbackContext): + """Stores the photo and asks for a location.""" + global scheduler + await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) + + status_id = context.user_data["status_id"] + status_content = context.user_data["status_content"] + + message = update.effective_message + context.user_data["media"] = [] + if message.media_group_id: + media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id + msg_dict = { + "media_id": media_id, + "caption": message.caption_html, + "status_id": status_id, + "content": status_content, + "chat_id": message.chat_id, + } + jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) + if jobs: + jobs[0].data.append(msg_dict) + else: + context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict], + name=str(message.media_group_id)) + else: + file = await update.message.effective_attachment[-1].get_file() + img = io.BytesIO() + await file.download_to_memory(img) + img.seek(0) + + media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") + mastodon_client.status_update( + status=status_content, + id=status_id, + media_ids=media["id"]) + + await update.message.reply_text(text="Done", + reply_markup=MAIN_MENU + ) + + +async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): + print(context.user_data) + await update.message.reply_text( + text="Done.", reply_markup=MAIN_MENU + ) + return ConversationHandler.END + + +def main() -> None: + application = Application.builder().token(BOT_TOKEN).build() + + checkin_handler = ConversationHandler( + entry_points=[ + CommandHandler("start", start), + MessageHandler(filters.LOCATION, checkin), + ], + states={ + LOCATION: [ + MessageHandler(filters.LOCATION, checkin), + ], + WAIT_LOC: [CallbackQueryHandler(process_callback)], + LOCATION_SEARCH: [ + MessageHandler(filters.TEXT, location_search_callback), + CommandHandler("skip", skip_location_search), + ], + PHOTO: [MessageHandler(filters.PHOTO, photo), + CommandHandler("skip", skip_photo)], + }, + fallbacks=[CommandHandler("cancel", cancel)], + per_message=False, + allow_reentry=True, + ) + + setting_conv_handler = ConversationHandler( + entry_points=[CommandHandler("setting", setting)], + states={ + SETTING: [ + CallbackQueryHandler(setting_process_callback), + ], + }, + fallbacks=[CommandHandler("back", setting_cancel)], + per_message=False, + allow_reentry=True, + ) + + application.add_handler(CommandHandler("tos", tos)) + application.add_handler(setting_conv_handler, 2) + application.add_handler(checkin_handler, 1) + + # Run the bot until the user presses Ctrl-C + application.run_polling() + + +if __name__ == "__main__": + main() diff --git a/bot/__init__.py b/bot/__init__.py deleted file mode 100644 index 5e3c341..0000000 --- a/bot/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from telegram import __version__ as TG_VER - -try: - from telegram import __version_info__ -except ImportError: - __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment] - -if __version_info__ < (20, 0, 0, "alpha", 1): - raise RuntimeError( - f"This example is not compatible with your current PTB version {TG_VER}. To view the " - f"{TG_VER} version of this example, " - f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" - ) diff --git a/bot/bot.py b/bot/bot.py deleted file mode 100644 index 2710184..0000000 --- a/bot/bot.py +++ /dev/null @@ -1,300 +0,0 @@ -#!/usr/bin/env python -# pylint: disable=unused-argument, wrong-import-position -# This program is dedicated to the public domain under the CC0 license. - -import logging -import io -import telegram.constants -from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton -from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, ConversationHandler, CallbackContext -from ..config import BOT_TOKEN -from foursquare.poi import query_poi -from dbstore.dbm_store import get_loc -from toot import mastodon_client -from typing import TypedDict, List, cast - -scheduler = None -PRIVACY, TOOT = map(chr, range(8, 10)) - -WAIT_LOC, LOCATION, PHOTO, PROCESS_PHOTO, FINAL, SETTING = range(6) - -# Enable logging -logging.basicConfig( - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO -) -logger = logging.getLogger(__name__) - -MAIN_MENU = ReplyKeyboardMarkup([ - [telegram.KeyboardButton(text="/check in", request_location=True)], - [telegram.KeyboardButton(text="/cancel")], - [telegram.KeyboardButton(text="/setting")] -]) - -SKIP_MENU = ReplyKeyboardMarkup([[telegram.KeyboardButton(text="/skip")]]) -SETTING_MENU = ReplyKeyboardMarkup( - [ - [KeyboardButton(text="/tos")], - [telegram.KeyboardButton(text="/back")], - ] -) - - -async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - hello = "Hello, this is `checkin.bot`. \n\n" \ - "This is a Telegram bot with functionality similar to Foursquare Swarm, " \ - "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \ - "Aware of privacy concerns, this bot will not store your location data." \ - "*Be safe and cautious when sharing your real time location on the web.* \n\n" \ - "Start using this bot by sharing your location using Telegram context menu to it." - - await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN) - await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU) - - return LOCATION - - -async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - keyboard = [] - - for poi in query_poi(update.message.location.latitude, update.message.location.longitude): - keyboard.append([ - InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]), - ]) - - reply_markup = InlineKeyboardMarkup(keyboard) - await update.message.reply_text("Where are you?", reply_markup=reply_markup) - - return WAIT_LOC - - -async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - query = update.callback_query - await query.answer() - print(query.data) - context.user_data["fsq_id"] = query.data - await query.delete_message() - - poi = get_loc(context.user_data["fsq_id"]) - media_id = [] - content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}" - status = mastodon_client.status_post( - content, - visibility="private", - media_ids=media_id) - - context.user_data["status_id"] = status["id"] - context.user_data["status_content"] = content - - print("status_id", context.user_data["status_id"]) - - await query.message.reply_text( - text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}", - parse_mode=telegram.constants.ParseMode.MARKDOWN, - reply_markup=MAIN_MENU - ) - - await query.message.reply_text("You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU) - return PHOTO - - -async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - await update.message.reply_text("TOS", reply_markup=MAIN_MENU) - - -async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - await update.message.reply_text("Setting", reply_markup=SETTING_MENU) - return SETTING - - -async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): - await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU) - return ConversationHandler.END - - -async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE): - await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) - - fsq_id = context.user_data["fsq_id"] - poi = get_loc(context.user_data["fsq_id"]) - media_id = [] - - if context.user_data.get("photo") is not None: - media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg") - media_id = [media["id"]] - # else: - # photo_url = get_poi_top_photo(context.user_data["fsq_id"]) - # if photo_url is not None: - # with urllib.request.urlopen(photo_url) as response: - # data = response.read() - # media = mastodon_client.media_post(data, mime_type="image/jpeg") - # media_id = [media["id"]] - - mastodon_client.status_post( - f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}", - visibility="private", - media_ids=media_id) - - await update.message.delete_message() - return ConversationHandler.END - - -async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Displays info on how to use the bot.""" - await update.message.reply_text("Use /start to test this bot.") - - -async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - """Cancels and ends the conversation.""" - user = update.message.from_user - logger.info("User %s canceled the conversation.", user.first_name) - await update.message.reply_text( - text="Setting canceled.", - # "Bye! I hope we can talk again some day.", - reply_markup=MAIN_MENU - ) - - return ConversationHandler.END - - -async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - """Cancels and ends the conversation.""" - user = update.message.from_user - logger.info("User %s canceled the conversation.", user.first_name) - await update.message.reply_text( - text="Canceled.", - # "Bye! I hope we can talk again some day.", - reply_markup=MAIN_MENU - ) - - return ConversationHandler.END - - -class MsgDict(TypedDict): - media_id: str - caption: str - status_id: int - content: str - chat_id: int - - -async def media_group_sender(context: CallbackContext): - context.job.data = cast(List[MsgDict], context.job.data) - - media_id = [] - chat_id = context.job.data[0].get("chat_id") - for msg_dict in context.job.data: - if len(media_id) >= 4: - print("Cannot attach more than 4 photos") - break - file = await context.bot.get_file(msg_dict.get("media_id")) - img = io.BytesIO() - await file.download_to_memory(img) - - img.seek(0) - - media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") - media_id.append(media["id"]) - - mastodon_client.status_update( - status=msg_dict.get("content"), - id=msg_dict.get("status_id"), - media_ids=media_id) - - await context.bot.send_message(chat_id=chat_id, text="Done", - reply_markup=MAIN_MENU - ) - - -async def photo(update: Update, context: CallbackContext): - """Stores the photo and asks for a location.""" - global scheduler - await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING) - - status_id = context.user_data["status_id"] - status_content = context.user_data["status_content"] - - message = update.effective_message - context.user_data["media"] = [] - if message.media_group_id: - media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id - msg_dict = { - "media_id": media_id, - "caption": message.caption_html, - "status_id": status_id, - "content": status_content, - "chat_id": message.chat_id, - } - jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id)) - if jobs: - jobs[0].data.append(msg_dict) - else: - context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict], - name=str(message.media_group_id)) - else: - file = await update.message.effective_attachment[-1].get_file() - img = io.BytesIO() - await file.download_to_memory(img) - img.seek(0) - - media = mastodon_client.media_post(img.read(), mime_type="image/jpeg") - mastodon_client.status_update( - status=status_content, - id=status_id, - media_ids=media["id"]) - - await update.message.reply_text(text="Done", - reply_markup=MAIN_MENU - ) - - -async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): - print(context.user_data) - await update.message.reply_text( - text="Done.", reply_markup=MAIN_MENU - ) - return ConversationHandler.END - - -def main() -> None: - application = Application.builder().token(BOT_TOKEN).build() - - checkin_handler = ConversationHandler( - entry_points=[ - CommandHandler("start", start), - MessageHandler(filters.LOCATION, checkin), - ], - states={ - LOCATION: [ - MessageHandler(filters.LOCATION, checkin), - ], - WAIT_LOC: [CallbackQueryHandler(process_callback)], - PHOTO: [MessageHandler(filters.PHOTO, photo), - CommandHandler("skip", skip_photo)], - }, - fallbacks=[CommandHandler("cancel", cancel)], - per_message=False, - allow_reentry=True, - ) - - setting_conv_handler = ConversationHandler( - entry_points=[CommandHandler("setting", setting)], - states={ - SETTING: [ - CallbackQueryHandler(setting_process_callback), - ], - }, - fallbacks=[CommandHandler("back", setting_cancel)], - per_message=False, - allow_reentry=True, - ) - - application.add_handler(CommandHandler("tos", tos)) - application.add_handler(setting_conv_handler, 2) - application.add_handler(checkin_handler, 1) - - # Run the bot until the user presses Ctrl-C - application.run_polling() - - -if __name__ == "__main__": - main() diff --git a/foursquare/poi.py b/foursquare/poi.py index 722dbb7..5e67d1c 100644 --- a/foursquare/poi.py +++ b/foursquare/poi.py @@ -5,7 +5,8 @@ import requests from config import FSQ_API_KEY from dbstore.dbm_store import store_loc -POI_API_ENDPOINT = "https://api.foursquare.com/v3/places/nearby?ll={}%2C{}" +POI_API_ENDPOINT = "https://api.foursquare.com/v3/places/nearby?ll={}%2C{}&limit=10" +POI_SEARCH_API_ENDPOINT = "https://api.foursquare.com/v3/places/search?query={}&ll={}%2C{}&radius=2000&limit=10" POI_PHOTO_ENDPOINT = "https://api.foursquare.com/v3/places/{}/photos?sort=POPULAR&limit=10" OSM_ENDPOINT = "https://www.openstreetmap.org/?mlat={}&mlon={}&zoom=15&layers=M" headers = { @@ -14,24 +15,25 @@ headers = { } -def query_poi(latitude, longitude): +def query_poi(search, latitude, longitude): locations = list() - url = POI_API_ENDPOINT.format(latitude, longitude) - + url = POI_SEARCH_API_ENDPOINT.format(search, latitude, longitude) + print(url) response = requests.get(url, headers=headers) for poi in json.loads(response.text)["results"]: loc = { "fsq_id": poi["fsq_id"], "name": poi["name"], - "locality": poi["location"]["locality"], - "region": poi["location"]["region"], + "locality": poi["location"]["locality"] if "locality" in poi["location"] else "", + "region": poi["location"]["region"] if "region" in poi["location"] else "", "latitude": poi["geocodes"]["main"]["latitude"], "longitude": poi["geocodes"]["main"]["longitude"], "osm_url": OSM_ENDPOINT.format(poi["geocodes"]["main"]["latitude"], poi["geocodes"]["main"]["longitude"]) } locations.append(loc) + print(loc) store_loc(loc) return locations -- cgit v1.2.3