aboutsummaryrefslogblamecommitdiff
blob: 687508a9f8b6c0ae49c1982d9a2721a5a37db57d (plain) (tree)
1
2
3
4
5
6
7
8
9





                                       
                             

                                                                                            

                                                    



























































































































































































































































                                                                                                                      
import io
from foursquare.poi import OSM_ENDPOINT
from foursquare.poi import query_poi
from dbstore.dbm_store import get_loc
from toot import mastodon_client
from command import *
from typing import cast, List
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardRemove
from telegram.ext import ContextTypes, ConversationHandler, CallbackContext
from telegram.constants import ParseMode, ChatAction
from telegram.error import BadRequest


async def callback_skip_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    query = update.callback_query
    await query.answer()

    await query.delete_message()
    await query.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU)

    return ConversationHandler.END


async def callback_location_sharing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    if update.message.venue is not None:
        fsq_id = update.message.venue.foursquare_id
        title = update.message.venue.title
        context.user_data["fsq_id"] = fsq_id
        context.user_data["title"] = title
        context.user_data["latitude"] = update.message.venue.location.latitude
        context.user_data["longitude"] = update.message.venue.location.longitude

        poi = get_loc(context.user_data["fsq_id"])
        media_id = []
        content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
        status = mastodon_client.status_post(
            content,
            visibility="private",
            media_ids=media_id)

        context.user_data["status_id"] = status["id"]
        context.user_data["status_content"] = content

        print("status_id", context.user_data["status_id"])

        await update.message.reply_text(
            text=f"Selected place: {poi['name']}, \nPosted to Mastodon: {status['url']}",
            parse_mode=ParseMode.MARKDOWN,
        )

        prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU)
        context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id

        return ADD_COMMENT
    else:
        context.user_data["latitude"] = update.message.location.latitude
        context.user_data["longitude"] = update.message.location.longitude

        await update.message.reply_text("Searching...", reply_markup=ReplyKeyboardRemove())
        prompt_msg = await update.message.reply_text(PROMPT_LOCATION_KEYWORD, reply_markup=INLINE_SKIP_MENU)

        context.user_data[PROMPT_LOCATION_KEYWORD] = prompt_msg.message_id
        return LOCATION_SEARCH_KEYWORD


async def callback_manual_location(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    loc = update.effective_message.text
    osm_url = OSM_ENDPOINT.format(context.user_data["latitude"], context.user_data["longitude"])
    media_id = []
    content = f"I'm at {loc}, {osm_url}"
    status = mastodon_client.status_post(
        content,
        visibility="private",
        media_ids=media_id)

    context.user_data["status_id"] = status["id"]
    context.user_data["status_content"] = content

    print("status_id", context.user_data["status_id"])

    await update.message.reply_text(
        text=f"Manually selected place: {loc}, \nPosted to Mastodon: {status['url']}",
        parse_mode=ParseMode.MARKDOWN,
    )

    prompt_attach_comment_msg = await update.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU)
    context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id

    return ADD_COMMENT


async def callback_location_confirmation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    query = update.callback_query
    await query.answer()
    context.user_data["fsq_id"] = query.data

    await query.delete_message()

    poi = get_loc(context.user_data["fsq_id"])
    media_id = []
    content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
    status = mastodon_client.status_post(
        content,
        visibility="private",
        media_ids=media_id)

    context.user_data["status_id"] = status["id"]
    context.user_data["status_content"] = content

    print("status_id", context.user_data["status_id"])

    await query.message.reply_text(
        text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}",
        parse_mode=ParseMode.MARKDOWN,
    )

    prompt_attach_comment_msg = await query.message.reply_text(PROMPT_ADD_COMMENT, reply_markup=INLINE_SKIP_MENU)
    context.user_data[PROMPT_ADD_COMMENT] = prompt_attach_comment_msg.message_id

    return ADD_COMMENT


async def callback_location_keyword_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_LOCATION_KEYWORD])

    location_search = update.effective_message.text
    latitude = context.user_data["latitude"]
    longitude = context.user_data["longitude"]

    keyboard = []
    poi_result = query_poi(location_search, latitude, longitude)
    if len(poi_result) == 0:
        poi_result = query_poi("", latitude, longitude)

    for poi in poi_result:
        keyboard.append([
            InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
        ])

    if len(keyboard) == 0:
        await update.message.reply_text(PROMPT_NO_NEARBY_POI)
        return LOCATION_CONFIRMATION
    else:
        reply_markup = InlineKeyboardMarkup(keyboard)
        context.user_data["location_search"] = location_search
        await update.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup)

    return LOCATION_CONFIRMATION


async def callback_skip_location_keyword(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    query = update.callback_query
    await query.answer()

    await query.message.delete()
    latitude = context.user_data["latitude"]
    longitude = context.user_data["longitude"]

    keyboard = []

    for poi in query_poi("", latitude, longitude):
        keyboard.append([
            InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
        ])

    reply_markup = InlineKeyboardMarkup(keyboard)
    await query.message.reply_text(PROMPT_CHOOSE_POI_FROM_LIST, reply_markup=reply_markup)

    return LOCATION_CONFIRMATION


async def callback_add_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT])
    comment = update.effective_message.text

    mastodon_client.status_update(
        status=f"{comment} " + context.user_data["status_content"],
        id=context.user_data["status_id"])

    context.user_data["status_content"] = f"{comment} " + context.user_data["status_content"]
    prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU)
    context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id

    return ADD_MEDIA


async def callback_skip_comment(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    await context.bot.delete_message(update.effective_chat.id, context.user_data[PROMPT_ADD_COMMENT])
    prompt_attach_photo_msg = await update.message.reply_text(PROMPT_ADD_MEDIA, reply_markup=INLINE_SKIP_MENU)
    context.user_data[PROMPT_ADD_MEDIA] = prompt_attach_photo_msg.message_id
    return ADD_MEDIA


async def process_media_group(context: CallbackContext):
    context.job.data = cast(List[MsgDict], context.job.data)

    media_id = []
    chat_id = context.job.data[0].get("chat_id")
    for msg_dict in context.job.data:
        if len(media_id) >= 4:
            await context.bot.send_message(chat_id=chat_id, text=PROMPT_MAX_PHOTO_REACHED, reply_markup=MAIN_MENU)
            return

        file = await context.bot.get_file(msg_dict.get("media_id"))
        img = io.BytesIO()
        await file.download_to_memory(img)

        img.seek(0)

        media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
        media_id.append(media["id"])

        mastodon_client.status_update(
            status=msg_dict.get("content"),
            id=msg_dict.get("status_id"),
            media_ids=media_id)

    await context.bot.send_message(chat_id=chat_id, text=PROMPT_DONE, reply_markup=MAIN_MENU)


async def callback_add_media(update: Update, context: CallbackContext):
    await update.message.reply_chat_action(ChatAction.TYPING)

    try:
        await context.bot.delete_message(chat_id=update.message.chat_id,
                                         message_id=context.user_data[PROMPT_ADD_MEDIA])
    except BadRequest as e:
        if "not found" in str(e.message):
            pass

    status_id = context.user_data["status_id"]
    status_content = context.user_data["status_content"]

    message = update.effective_message
    context.user_data["media"] = []
    if message.media_group_id:
        media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id
        msg_dict = {
            "media_id": media_id,
            "caption": message.caption_html,
            "status_id": status_id,
            "content": status_content,
            "chat_id": message.chat_id,
        }
        jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id))
        if jobs:
            jobs[0].data.append(msg_dict)
        else:
            context.job_queue.run_once(callback=process_media_group, when=MEDIA_GROUP_TIMEOUT,
                                       data=[msg_dict], name=str(message.media_group_id))
    else:
        file = await update.message.effective_attachment[-1].get_file()
        img = io.BytesIO()
        await file.download_to_memory(img)
        img.seek(0)

        media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
        mastodon_client.status_update(
            status=status_content,
            id=status_id,
            media_ids=media["id"])

        await update.message.reply_text(text=PROMPT_DONE, reply_markup=MAIN_MENU)
Powered by cgit v1.2.3 (git 2.41.0)