import io
from typing import cast, List
from telegram import ReplyKeyboardRemove
from telegram.constants import ChatAction
from telegram.ext import CallbackContext
from command import *
from dbstore.peewee_store import get_poi_by_fsq_id
from foursquare.poi import query_poi, query_poi_by_fsq_id, OSM_ENDPOINT
from config import BOT_SCOPE, ENCRYPT_KEY
from dbstore.peewee_store import User, db, TOOT_VISIBILITY_PRIVATE, TOOT_VISIBILITY_PUBLIC, TOOT_VISIBILITY_UNLISTED
import uuid
from mastodon import Mastodon
from util import decrypt
def generate_uuid():
return str(uuid.uuid4())
def get_mastodon_client(user_id: int):
with db.connection_context():
user = User.get(User.telegram_user_id == user_id)
if user.home_instance and user.access_key:
feature_set = "pleroma" if user.home_instance_type == "pleroma" else "mainline"
return Mastodon(access_token=decrypt(user.access_key, ENCRYPT_KEY),
api_base_url=user.home_instance, feature_set=feature_set, version_check_mode='none', )
def generate_toot_text(poi_name, poi_locality, poi_region, poi_lat, poi_lon):
osm_url = OSM_ENDPOINT.format(poi_lat, poi_lon)
location = ""
if poi_locality:
location = poi_locality
if poi_region:
location += ", " + poi_region
if location:
return f"I'm at {poi_name} in {location}, {osm_url}"
else:
return f"I'm at {poi_name}, {osm_url}"
async def get_img_file_bytes(telegram_media_file):
img = io.BytesIO()
await telegram_media_file.download_to_memory(img)
img.seek(0)
return img.read()
async def process_media_group(context: CallbackContext):
context.job.data = cast(List[MsgDict], context.job.data)
mastodon_client = get_mastodon_client(context.user_data["user_id"])
media_id = []
chat_id = context.job.data[0].get("chat_id")
for media_dict in context.job.data:
if len(media_id) >= 4:
await context.bot.send_message(chat_id=chat_id, text=PROMPT_MAX_PHOTO_REACHED, reply_markup=MAIN_MENU)
return
file = await context.bot.get_file(media_dict.get("media_id"))
img = await get_img_file_bytes(file)
media = mastodon_client.media_post(img, description=media_dict.get("caption"), mime_type="image/jpeg")
media_id.append(media["id"])
mastodon_client.status_update(id=media_dict.get("status_id"),
status=media_dict.get("content"),
media_ids=media_id)
await context.bot.send_message(chat_id=chat_id, text=PROMPT_DONE, reply_markup=MAIN_MENU)
async def callback_generate_fedi_login_url(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
home_instance = update.effective_message.text
client_id, client_secret = Mastodon.create_app(
"Checkin.bot",
scopes=BOT_SCOPE,
redirect_uris="{}{}".format(BOT_DOMAIN, FEDI_LOGIN_CALLBACK_URL),
api_base_url=home_instance,
)
m = Mastodon(client_id=client_id, client_secret=client_secret, api_base_url=home_instance)
user_id = update.effective_user.id
state = generate_uuid()
with db.connection_context():
u = User.get_or_none(telegram_user_id=user_id)
if u is None:
u = User.create(telegram_user_id=user_id, access_key="", home_instance=home_instance,
client_id=client_id, client_secret=client_secret, state=state)
u.save()
oauth_url = m.auth_request_url(redirect_uris="{}{}".format(BOT_DOMAIN, FEDI_LOGIN_CALLBACK_URL),
scopes=BOT_SCOPE,
state=state)
msg = await update.message.reply_text(PROMPT_FEDI_LOGIN,
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("Login", url=oauth_url)]]),
parse_mode=ParseMode.MARKDOWN)
context.user_data[PROMPT_FEDI_LOGIN] = msg.message_id
return FEDI_LOGIN
async def callback_location_sharing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
if update.message.venue is not None:
context.user_data["fsq_id"] = update.message.venue.foursquare_id
context.user_data["title"] = update.message.venue.title
context.user_data["latitude"] = update.message.venue.location.latitude
context.user_data["longitude"] = update.message.venue.location.longitude
poi = query_poi_by_fsq_id(context.user_data.get("fsq_id"))
content = generate_toot_text(poi["name"], poi["locality"], poi["region"], poi["latitude"], poi["longitude"])
# TODO
# mastodon.py status_update() currently does not support content_type parameter
with db.connection_context():
u = User.get(User.telegram_user_id == update.effective_user.id)
content_type = "text/markdown" if u.home_instance_type == "pleroma" else "text/plain"
status = get_mastodon_client(update.effective_user.id).status_post(content,
visibility=TOOT_VISIBILITY_PRIVATE,
content_type=content_type,
media_ids=[])
context.user_data[KEY_TOOT_STATUS_ID] = status["id"]
context.user_data[KEY_TOOT_STATUS_CONTENT] = content
await update.message.reply_text(text=f"Selected place: {poi['name']}, \nPosted to Mastodon: {status['url']}",
parse_mode=ParseMode.MARKDOWN)
msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU)
context.user_data[PROMPT_ADD_COMMENT] = msg.message_id
return ADD_COMMENT
else:
context.user_data["latitude"] = update.message.location.latitude
context.user_data["longitude"] = update.message.location.longitude
await update.message.reply_text("Searching...", reply_markup=ReplyKeyboardRemove())
msg = await update.message.reply_text(PROMPT_LOCATION_KEYWORD, reply_markup=INLINE_SKIP_MENU)
context.user_data[PROMPT_LOCATION_KEYWORD] = msg.message_id
return LOCATION_SEARCH_KEYWORD
async def _process_location_search(keyword, lat, lon) -> list:
keyboard = []
poi_result = []
if keyword:
poi_result = query_poi(keyword, lat, lon)
if len(poi_result) == 0:
poi_result = query_poi("", lat, lon)
for poi in poi_result:
keyboard.append([
InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
])
return keyboard
async def callback_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await context.bot.delete_message(update.effective_chat.id, context.user_data.get(PROMPT_LOCATION_KEYWORD))
key = update.effective_message.text
keyboard = await _process_location_search(key, context.user_data.get("latitude"),
context.user_data.get("longitude"))
if len(keyboard) == 0:
msg = await update.message.reply_text(PROMPT_WAIT_LOCATION_CONFIRMATION_NO_NEARBY_POI)
else:
msg = await update.message.reply_text(PROMPT_WAIT_LOCATION_CONFIRMATION,
reply_markup=InlineKeyboardMarkup(keyboard))
context.user_data[PROMPT_WAIT_LOCATION_CONFIRMATION] = msg.message_id
return LOCATION_CONFIRMATION
async def callback_skip_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query = update.callback_query
await query.answer()
await query.message.delete()
keyboard = await _process_location_search("", context.user_data.get("latitude"), context.user_data.get("longitude"))
await query.message.reply_text(PROMPT_WAIT_LOCATION_CONFIRMATION, reply_markup=InlineKeyboardMarkup(keyboard))
return LOCATION_CONFIRMATION
async def _process_location_selection(context: ContextTypes.DEFAULT_TYPE) -> int:
poi_name = context.user_data.get("poi_name")
if context.user_data.get("fsq_id") is not None:
poi = get_poi_by_fsq_id(context.user_data.get("fsq_id"))
poi_name = poi["name"]
content = generate_toot_text(poi["name"], poi["locality"], poi["region"], poi["latitude"], poi["longitude"])
else:
content = generate_toot_text(poi_name, "", "", context.user_data.get("latitude"),
context.user_data.get("longitude"))
with db.connection_context():
u = User.get(User.telegram_user_id == context.user_data["user_id"])
content_type = "text/markdown" if u.home_instance_type == "pleroma" else "text/plain"
status = get_mastodon_client(context.user_data["user_id"]).status_post(content,
visibility=TOOT_VISIBILITY_PRIVATE,
content_type=content_type,
media_ids=[])
context.user_data[KEY_TOOT_STATUS_ID] = status["id"]
context.user_data[KEY_TOOT_STATUS_CONTENT] = content
await context.bot.send_message(chat_id=context.user_data.get("chat_id"),
text=f"Selected place: {poi_name}, \nPosted to Mastodon: {status['url']}",
parse_mode=ParseMode.MARKDOWN)
msg = await context.bot.send_message(chat_id=context.user_data.get("chat_id"),
text=PROMPT_ADD_COMMENT,
reply_markup=INLINE_SKIP_MENU)
context.user_data[PROMPT_ADD_COMMENT] = msg.message_id
return ADD_COMMENT
async def callback_location_confirmation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query = update.callback_query
await query.answer()
context.user_data["fsq_id"] = query.data
context.user_data["user_id"] = update.effective_user.id
await query.delete_message()
context.user_data["chat_id"] = update.effective_chat.id
return await _process_location_selection(context)
async def callback_manual_location(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
context.user_data["poi_name"] = update.effective_message.text
context.user_data["chat_id"] = update.effective_chat.id
context.user_data["user_id"] = update.effective_user.id
return await _process_location_selection(context)
async def _process_comment(context: ContextTypes.DEFAULT_TYPE) -> int:
msg = await context.bot.send_message(chat_id=context.user_data.get("chat_id"),
text=PROMPT_ADD_MEDIA,
reply_markup=INLINE_SKIP_MENU)
context.user_data[PROMPT_ADD_MEDIA] = msg.message_id
return ADD_MEDIA
async def callback_add_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
context.user_data["chat_id"] = update.effective_chat.id
context.user_data["user_id"] = update.effective_user.id
await context.bot.delete_message(update.effective_chat.id, context.user_data.get(PROMPT_ADD_COMMENT))
comment = update.effective_message.text
get_mastodon_client(update.effective_user.id).status_update(id=context.user_data.get(KEY_TOOT_STATUS_ID),
status=f"{comment} " + context.user_data.get(
KEY_TOOT_STATUS_CONTENT))
context.user_data[KEY_TOOT_STATUS_CONTENT] = f"{comment} " + context.user_data.get(KEY_TOOT_STATUS_CONTENT)
return await _process_comment(context)
async def callback_skip_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
context.user_data["chat_id"] = update.effective_chat.id
await context.bot.delete_message(update.effective_chat.id, context.user_data.get(PROMPT_ADD_COMMENT))
return await _process_comment(context)
async def callback_add_media(update: Update, context: CallbackContext):
await update.message.reply_chat_action(ChatAction.TYPING)
try:
await context.bot.delete_message(chat_id=update.message.chat_id,
message_id=context.user_data.get(PROMPT_ADD_MEDIA))
except BadRequest as e:
if "not found" in str(e.message):
pass
mastodon_client = get_mastodon_client(update.effective_user.id)
status_id = context.user_data.get(KEY_TOOT_STATUS_ID)
status_content = context.user_data.get(KEY_TOOT_STATUS_CONTENT)
message = update.effective_message
context.user_data["media"] = []
if message.media_group_id:
media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id
group_media = {
"media_id": media_id,
"caption": message.caption_html,
"status_id": status_id,
"content": status_content,
"chat_id": message.chat_id,
}
jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id))
if jobs:
jobs[0].data.append(group_media)
else:
context.job_queue.run_once(callback=process_media_group,
when=MEDIA_GROUP_TIMEOUT,
data=[group_media],
name=str(message.media_group_id))
else:
file = await update.message.effective_attachment[-1].get_file()
img = await get_img_file_bytes(file)
media = mastodon_client.media_post(img,
description=message.caption_html,
mime_type="image/jpeg")
mastodon_client.status_update(status=status_content, id=status_id, media_ids=media["id"])
await update.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU)
async def callback_skip_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query = update.callback_query
await query.answer()
await query.delete_message()
await query.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU)
return ConversationHandler.END