#!/usr/bin/env python
# pylint: disable=unused-argument, wrong-import-position
# This program is dedicated to the public domain under the CC0 license.
import logging
import io
import telegram.constants
from telegram import __version__ as TG_VER
from pprint import pprint
try:
from telegram import __version_info__
except ImportError:
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
if __version_info__ < (20, 0, 0, "alpha", 1):
raise RuntimeError(
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
f"{TG_VER} version of this example, "
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
)
from foursquare.poi import OSM_ENDPOINT
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, \
ConversationHandler, CallbackContext
from config import BOT_TOKEN
from foursquare.poi import query_poi
from dbstore.dbm_store import get_loc
from toot import mastodon_client
from typing import TypedDict, List, cast
scheduler = None
PRIVACY, TOOT = map(chr, range(8, 10))
WAIT_LOC, LOCATION, LOCATION_SEARCH, PHOTO, PROCESS_PHOTO, COMMENT, SETTING = range(7)
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
MAIN_MENU = ReplyKeyboardMarkup([
[KeyboardButton(text="Check-in here", request_location=True)],
# [KeyboardButton(text="/cancel")],
# [KeyboardButton(text="/setting")]
])
SKIP_LOCATION_SEARCH = "skip_location_search"
SKIP_MENU = InlineKeyboardMarkup([
[telegram.InlineKeyboardButton("Skip", callback_data=SKIP_LOCATION_SEARCH)]
])
# SETTING_MENU = InlineKeyboardMarkup(
# [
# [InlineKeyboardButton(text="/tos")],
# [InlineKeyboardButton(text="/back")],
# ]
# )
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
hello = "Hello, this is `checkin.bot`. \n\n" \
"This is a Telegram bot with functionality similar to Foursquare Swarm, " \
"but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \
"Aware of privacy concerns, this bot will not store your location data." \
"*Be safe and cautious when sharing your real time location on the web.* \n\n" \
"Start using this bot by sharing your location using Telegram context menu to it."
await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN)
await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU)
return LOCATION
async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
pprint(update.message.location)
if update.message.venue is not None:
fsq_id = update.message.venue.foursquare_id
title = update.message.venue.title
context.user_data["fsq_id"] = fsq_id
context.user_data["title"] = title
context.user_data["latitude"] = update.message.venue.location.latitude
context.user_data["longitude"] = update.message.venue.location.longitude
poi = get_loc(context.user_data["fsq_id"])
media_id = []
content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
status = mastodon_client.status_post(
content,
visibility="private",
media_ids=media_id)
context.user_data["status_id"] = status["id"]
context.user_data["status_content"] = content
print("status_id", context.user_data["status_id"])
await update.message.reply_text(
text=f"Selected place: {poi['name']}, \nPosted to Mastodon: {status['url']}",
parse_mode=telegram.constants.ParseMode.MARKDOWN,
)
prompt_attach_comment_msg = await update.message.reply_text(
"You can continue adding comments, or press skip to finish", reply_markup=SKIP_MENU)
context.user_data["prompt_attach_comment_msg_id"] = prompt_attach_comment_msg.message_id
return COMMENT
else:
context.user_data["latitude"] = update.message.location.latitude
context.user_data["longitude"] = update.message.location.longitude
await update.message.reply_text("Searching...", reply_markup=telegram.ReplyKeyboardRemove())
prompt_msg = await update.message.reply_text("You can input location search keywords or press skip",
reply_markup=SKIP_MENU)
context.user_data["prompt_msg_id"] = prompt_msg.message_id
return LOCATION_SEARCH
async def manual_location_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
# query = update.callback_query
# await query.answer()
# await query.delete_message()
loc = update.effective_message.text
osm_url = OSM_ENDPOINT.format(context.user_data["latitude"], context.user_data["longitude"])
media_id = []
content = f"I'm at {loc}, {osm_url}"
status = mastodon_client.status_post(
content,
visibility="private",
media_ids=media_id)
context.user_data["status_id"] = status["id"]
context.user_data["status_content"] = content
print("status_id", context.user_data["status_id"])
await update.message.reply_text(
text=f"Manually selected place: {loc}, \nPosted to Mastodon: {status['url']}",
parse_mode=telegram.constants.ParseMode.MARKDOWN,
# reply_markup=MAIN_MENU
)
prompt_attach_comment_msg = await update.message.reply_text(
"You can continue adding comments, or press skip to finish", reply_markup=SKIP_MENU)
context.user_data["prompt_attach_comment_msg_id"] = prompt_attach_comment_msg.message_id
return COMMENT
async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
print("process_callback")
query = update.callback_query
await query.answer()
print(query.data)
context.user_data["fsq_id"] = query.data
await query.delete_message()
poi = get_loc(context.user_data["fsq_id"])
media_id = []
content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
status = mastodon_client.status_post(
content,
visibility="private",
media_ids=media_id)
context.user_data["status_id"] = status["id"]
context.user_data["status_content"] = content
print("status_id", context.user_data["status_id"])
await query.message.reply_text(
text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}",
parse_mode=telegram.constants.ParseMode.MARKDOWN,
# reply_markup=MAIN_MENU
)
prompt_attach_comment_msg = await query.message.reply_text(
"You can continue adding comments, or press skip to finish", reply_markup=SKIP_MENU)
context.user_data["prompt_attach_comment_msg_id"] = prompt_attach_comment_msg.message_id
return COMMENT
async def location_search_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await context.bot.delete_message(update.effective_chat.id, context.user_data["prompt_msg_id"])
location_search = update.effective_message.text
latitude = context.user_data["latitude"]
longitude = context.user_data["longitude"]
keyboard = []
poi_result = query_poi(location_search, latitude, longitude)
if len(poi_result) == 0:
poi_result = query_poi("", latitude, longitude)
# for poi in poi_result:
# keyboard.append([
# InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
# ])
if len(keyboard) == 0:
await update.message.reply_text("No nearby places found. You can input location name manually")
return WAIT_LOC
else:
reply_markup = InlineKeyboardMarkup(keyboard)
context.user_data["location_search"] = location_search
await update.message.reply_text("Where are you? ", reply_markup=reply_markup)
return WAIT_LOC
async def skip_location_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query = update.callback_query
await query.answer()
print("skip: ", query.data)
await query.message.delete()
latitude = context.user_data["latitude"]
longitude = context.user_data["longitude"]
keyboard = []
for poi in query_poi("", latitude, longitude):
keyboard.append([
InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
])
reply_markup = InlineKeyboardMarkup(keyboard)
await query.message.reply_text("Where are you? ", reply_markup=reply_markup)
return WAIT_LOC
async def comment_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await context.bot.delete_message(update.effective_chat.id, context.user_data["prompt_attach_comment_msg_id"])
comment = update.effective_message.text
mastodon_client.status_update(
status=f"{comment} " + context.user_data["status_content"],
id=context.user_data["status_id"])
context.user_data["status_content"] = f"{comment} " + context.user_data["status_content"]
prompt_attach_photo_msg = await update.message.reply_text(
"You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU)
context.user_data["prompt_attach_photo_msg_id"] = prompt_attach_photo_msg.message_id
return PHOTO
async def skip_comment_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await context.bot.delete_message(update.effective_chat.id, context.user_data["prompt_attach_comment_msg_id"])
prompt_attach_photo_msg = await update.message.reply_text(
"You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU)
context.user_data["prompt_attach_photo_msg_id"] = prompt_attach_photo_msg.message_id
return PHOTO
# async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# await update.message.reply_text("TOS", reply_markup=MAIN_MENU)
# async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# await update.message.reply_text("Setting", reply_markup=SETTING_MENU)
# return SETTING
# async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
# await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU)
# return ConversationHandler.END
# async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE):
# await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)
#
# fsq_id = context.user_data["fsq_id"]
# poi = get_loc(context.user_data["fsq_id"])
# media_id = []
#
# if context.user_data.get("photo") is not None:
# media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg")
# media_id = [media["id"]]
# # else:
# # photo_url = get_poi_top_photo(context.user_data["fsq_id"])
# # if photo_url is not None:
# # with urllib.request.urlopen(photo_url) as response:
# # data = response.read()
# # media = mastodon_client.media_post(data, mime_type="image/jpeg")
# # media_id = [media["id"]]
#
# mastodon_client.status_post(
# f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}",
# visibility="private",
# media_ids=media_id)
#
# await update.message.delete()
# return ConversationHandler.END
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Displays info on how to use the bot."""
await update.message.reply_text("Use /start to test this bot.")
# async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
# """Cancels and ends the conversation."""
# user = update.message.from_user
# logger.info("User %s canceled the conversation.", user.first_name)
# await update.message.reply_text(
# text="Setting canceled.",
# # "Bye! I hope we can talk again some day.",
# reply_markup=MAIN_MENU
# )
#
# return ConversationHandler.END
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Cancels and ends the conversation."""
user = update.message.from_user
logger.info("User %s canceled the conversation.", user.first_name)
await update.message.reply_text(
text="Canceled.",
# "Bye! I hope we can talk again some day.",
reply_markup=MAIN_MENU
)
return ConversationHandler.END
class MsgDict(TypedDict):
media_id: str
caption: str
status_id: int
content: str
chat_id: int
async def media_group_sender(context: CallbackContext):
context.job.data = cast(List[MsgDict], context.job.data)
media_id = []
chat_id = context.job.data[0].get("chat_id")
for msg_dict in context.job.data:
if len(media_id) >= 4:
print("Cannot attach more than 4 photos")
break
file = await context.bot.get_file(msg_dict.get("media_id"))
img = io.BytesIO()
await file.download_to_memory(img)
img.seek(0)
media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
media_id.append(media["id"])
mastodon_client.status_update(
status=msg_dict.get("content"),
id=msg_dict.get("status_id"),
media_ids=media_id)
await context.bot.send_message(chat_id=chat_id, text="Done",
reply_markup=MAIN_MENU
)
async def photo(update: Update, context: CallbackContext):
"""Stores the photo and asks for a location."""
await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)
try:
await context.bot.delete_message(chat_id=update.message.chat_id, message_id=context.user_data["prompt_attach_photo_msg_id"])
except telegram.error.BadRequest as e:
if "not found" in str(e.message):
print("prompt_attach_photo_msg already deleted")
pass
status_id = context.user_data["status_id"]
status_content = context.user_data["status_content"]
message = update.effective_message
context.user_data["media"] = []
if message.media_group_id:
media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id
msg_dict = {
"media_id": media_id,
"caption": message.caption_html,
"status_id": status_id,
"content": status_content,
"chat_id": message.chat_id,
}
jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id))
if jobs:
jobs[0].data.append(msg_dict)
else:
context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict],
name=str(message.media_group_id))
else:
file = await update.message.effective_attachment[-1].get_file()
img = io.BytesIO()
await file.download_to_memory(img)
img.seek(0)
media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
mastodon_client.status_update(
status=status_content,
id=status_id,
media_ids=media["id"])
await update.message.reply_text(text="Done",
reply_markup=MAIN_MENU
)
async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query = update.callback_query
await query.answer()
await query.delete_message()
await query.message.reply_text(
text="Done.", reply_markup=MAIN_MENU
)
return ConversationHandler.END
def main() -> None:
application = Application.builder().token(BOT_TOKEN).build()
checkin_handler = ConversationHandler(
entry_points=[
CommandHandler("start", start),
MessageHandler(filters.LOCATION, checkin),
],
states={
LOCATION: [
MessageHandler(filters.LOCATION, checkin),
],
LOCATION_SEARCH: [
MessageHandler(filters.TEXT, location_search_callback),
CallbackQueryHandler(skip_location_search),
],
WAIT_LOC: [
CallbackQueryHandler(process_callback),
MessageHandler(filters.TEXT, manual_location_process_callback)
],
COMMENT: [
MessageHandler(filters.TEXT, comment_callback),
CallbackQueryHandler(skip_comment_callback),
],
PHOTO: [MessageHandler(filters.PHOTO, photo),
CallbackQueryHandler(skip_photo)],
},
fallbacks=[CommandHandler("cancel", cancel)],
per_message=False,
allow_reentry=True,
)
application.add_handler(checkin_handler, 1)
# Run the bot until the user presses Ctrl-C
application.run_polling()
if __name__ == "__main__":
main()