aboutsummaryrefslogblamecommitdiff
path: root/bot.py
blob: c7a31809630e9e17a7788bef5e14f3cb4595039f (plain) (tree)
1
2
3
4
5
6
7
8
9



                                                                       
              
         
                         
                                          
                         












                                                                                              
                                       
                                                                                                            

                                                                                                                    
                            
                                    

                                     
                                        



                                      
                                                                                      






                                                                                     
                                 


                                                                  

  





                                                                               
 





                                               
 
 
                                                                           


                                                                                                               

                                                                                            
                                                                                              
 
                                                                                            
                                                                                                   

                   

 
                                                                             


























                                                                                                 


                                                                                                
 
                      









                                                                                                            
 
 































                                                                                                      
                                                                                      
                             
                                 
                        


                                            
 


                                                                                             
                                         











                                                                                                   
                                

     


                                                                                            
 
                  

 
                                                                                              
                                                                                                  
 




                                                   















                                                                                                       



                   


















                                                                                          


                   























                                                                                                                 

                                                                            

 


                                                                                
 
 


                                                                                            

 
























                                                                                                    






                                                                                   










                                                                                      

 




                                                                            


                                                    


















                                                            


                                                     














                                                                              

                                                         



                                                          

                                                                                





                                                                                                                                    
 













                                                                                                       










                                                                                            
 






                                                                              

                                                              

 


                                                                                
 

                                   
                                            



                                  
                   
                                                                
 
                                          

                                           
                                                      




                                                          

                                                                       
                                                           
              







                                                                              
                                                         
                                                      


                                                     
                           

     
                                               






                                               
#!/usr/bin/env python
# pylint: disable=unused-argument, wrong-import-position
# This program is dedicated to the public domain under the CC0 license.

import logging
import io
import telegram.constants
from telegram import __version__ as TG_VER
from pprint import pprint

try:
    from telegram import __version_info__
except ImportError:
    __version_info__ = (0, 0, 0, 0, 0)  # type: ignore[assignment]

if __version_info__ < (20, 0, 0, "alpha", 1):
    raise RuntimeError(
        f"This example is not compatible with your current PTB version {TG_VER}. To view the "
        f"{TG_VER} version of this example, "
        f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
    )

from foursquare.poi import OSM_ENDPOINT
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, MessageHandler, filters, \
    ConversationHandler, CallbackContext
from config import BOT_TOKEN
from foursquare.poi import query_poi
from dbstore.dbm_store import get_loc
from toot import mastodon_client
from typing import TypedDict, List, cast

scheduler = None
PRIVACY, TOOT = map(chr, range(8, 10))

WAIT_LOC, LOCATION, LOCATION_SEARCH, PHOTO, PROCESS_PHOTO, COMMENT, SETTING = range(7)

# Enable logging
logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)

MAIN_MENU = ReplyKeyboardMarkup([
    [KeyboardButton(text="Check-in here", request_location=True)],
    # [KeyboardButton(text="/cancel")],
    # [KeyboardButton(text="/setting")]
])

SKIP_LOCATION_SEARCH = "skip_location_search"

SKIP_MENU = InlineKeyboardMarkup([
    [telegram.InlineKeyboardButton("Skip", callback_data=SKIP_LOCATION_SEARCH)]
])


# SETTING_MENU = InlineKeyboardMarkup(
#     [
#         [InlineKeyboardButton(text="/tos")],
#         [InlineKeyboardButton(text="/back")],
#     ]
# )


async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    hello = "Hello, this is `checkin.bot`. \n\n" \
            "This is a Telegram bot with functionality similar to Foursquare Swarm, " \
            "but check in and post your location to the Fediverse (Mastodon/Pleroma) instead of Twitter.\n\n" \
            "Aware of privacy concerns, this bot will not store your location data." \
            "*Be safe and cautious when sharing your real time location on the web.* \n\n" \
            "Start using this bot by sharing your location using Telegram context menu to it."

    await update.message.reply_text(hello, parse_mode=telegram.constants.ParseMode.MARKDOWN)
    await update.message.reply_text("Use bot keyboard to choose an action", reply_markup=MAIN_MENU)

    return LOCATION


async def checkin(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    pprint(update.message.location)
    if update.message.venue is not None:
        fsq_id = update.message.venue.foursquare_id
        title = update.message.venue.title
        context.user_data["fsq_id"] = fsq_id
        context.user_data["title"] = title
        context.user_data["latitude"] = update.message.venue.location.latitude
        context.user_data["longitude"] = update.message.venue.location.longitude

        poi = get_loc(context.user_data["fsq_id"])
        media_id = []
        content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
        status = mastodon_client.status_post(
            content,
            visibility="private",
            media_ids=media_id)

        context.user_data["status_id"] = status["id"]
        context.user_data["status_content"] = content

        print("status_id", context.user_data["status_id"])

        await update.message.reply_text(
            text=f"Selected place: {poi['name']}, \nPosted to Mastodon: {status['url']}",
            parse_mode=telegram.constants.ParseMode.MARKDOWN,
        )

        prompt_attach_comment_msg = await update.message.reply_text(
            "You can continue adding comments, or press skip to finish", reply_markup=SKIP_MENU)
        context.user_data["prompt_attach_comment_msg_id"] = prompt_attach_comment_msg.message_id

        return COMMENT
    else:
        context.user_data["latitude"] = update.message.location.latitude
        context.user_data["longitude"] = update.message.location.longitude

        await update.message.reply_text("Searching...", reply_markup=telegram.ReplyKeyboardRemove())
        prompt_msg = await update.message.reply_text("You can input location search keywords or press skip",
                                                     reply_markup=SKIP_MENU)

        context.user_data["prompt_msg_id"] = prompt_msg.message_id
        return LOCATION_SEARCH


async def manual_location_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    # query = update.callback_query
    # await query.answer()
    # await query.delete_message()

    loc = update.effective_message.text
    osm_url = OSM_ENDPOINT.format(context.user_data["latitude"], context.user_data["longitude"])
    media_id = []
    content = f"I'm at {loc}, {osm_url}"
    status = mastodon_client.status_post(
        content,
        visibility="private",
        media_ids=media_id)

    context.user_data["status_id"] = status["id"]
    context.user_data["status_content"] = content

    print("status_id", context.user_data["status_id"])

    await update.message.reply_text(
        text=f"Manually selected place: {loc}, \nPosted to Mastodon: {status['url']}",
        parse_mode=telegram.constants.ParseMode.MARKDOWN,
        # reply_markup=MAIN_MENU
    )

    prompt_attach_comment_msg = await update.message.reply_text(
        "You can continue adding comments, or press skip to finish", reply_markup=SKIP_MENU)
    context.user_data["prompt_attach_comment_msg_id"] = prompt_attach_comment_msg.message_id

    return COMMENT


async def process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    print("process_callback")
    query = update.callback_query
    await query.answer()
    print(query.data)
    context.user_data["fsq_id"] = query.data
    await query.delete_message()

    poi = get_loc(context.user_data["fsq_id"])
    media_id = []
    content = f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}"
    status = mastodon_client.status_post(
        content,
        visibility="private",
        media_ids=media_id)

    context.user_data["status_id"] = status["id"]
    context.user_data["status_content"] = content

    print("status_id", context.user_data["status_id"])

    await query.message.reply_text(
        text=f"Selected place: {poi['name']}, `{query.data}`\nPosted to Mastodon: {status['url']}",
        parse_mode=telegram.constants.ParseMode.MARKDOWN,
        # reply_markup=MAIN_MENU
    )

    prompt_attach_comment_msg = await query.message.reply_text(
        "You can continue adding comments, or press skip to finish", reply_markup=SKIP_MENU)
    context.user_data["prompt_attach_comment_msg_id"] = prompt_attach_comment_msg.message_id

    return COMMENT


async def location_search_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    await context.bot.delete_message(update.effective_chat.id, context.user_data["prompt_msg_id"])

    location_search = update.effective_message.text
    latitude = context.user_data["latitude"]
    longitude = context.user_data["longitude"]

    keyboard = []
    poi_result = query_poi(location_search, latitude, longitude)
    if len(poi_result) == 0:
        poi_result = query_poi("", latitude, longitude)

    # for poi in poi_result:
    #     keyboard.append([
    #         InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
    #     ])

    if len(keyboard) == 0:
        await update.message.reply_text("No nearby places found. You can input location name manually")
        return WAIT_LOC
    else:
        reply_markup = InlineKeyboardMarkup(keyboard)
        context.user_data["location_search"] = location_search
        await update.message.reply_text("Where are you? ", reply_markup=reply_markup)

    return WAIT_LOC


async def skip_location_search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    query = update.callback_query
    await query.answer()
    print("skip: ", query.data)

    await query.message.delete()
    latitude = context.user_data["latitude"]
    longitude = context.user_data["longitude"]

    keyboard = []

    for poi in query_poi("", latitude, longitude):
        keyboard.append([
            InlineKeyboardButton(poi["name"], callback_data=poi["fsq_id"]),
        ])

    reply_markup = InlineKeyboardMarkup(keyboard)
    await query.message.reply_text("Where are you? ", reply_markup=reply_markup)

    return WAIT_LOC


async def comment_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    await context.bot.delete_message(update.effective_chat.id, context.user_data["prompt_attach_comment_msg_id"])
    comment = update.effective_message.text

    mastodon_client.status_update(
        status=f"{comment} " + context.user_data["status_content"],
        id=context.user_data["status_id"])

    context.user_data["status_content"] = f"{comment} " + context.user_data["status_content"]
    prompt_attach_photo_msg = await update.message.reply_text(
        "You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU)
    context.user_data["prompt_attach_photo_msg_id"] = prompt_attach_photo_msg.message_id

    return PHOTO


async def skip_comment_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    await context.bot.delete_message(update.effective_chat.id, context.user_data["prompt_attach_comment_msg_id"])
    prompt_attach_photo_msg = await update.message.reply_text(
        "You can continue attaching photos, or press skip to finish", reply_markup=SKIP_MENU)
    context.user_data["prompt_attach_photo_msg_id"] = prompt_attach_photo_msg.message_id
    return PHOTO


# async def tos(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
#     await update.message.reply_text("TOS", reply_markup=MAIN_MENU)


# async def setting(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
#     await update.message.reply_text("Setting", reply_markup=SETTING_MENU)
#     return SETTING


# async def setting_process_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
#     await update.message.reply_text("Setting Process Callback", reply_markup=SETTING_MENU)
#     return ConversationHandler.END


# async def process_location(update: Update, context: ContextTypes.DEFAULT_TYPE):
#     await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)
#
#     fsq_id = context.user_data["fsq_id"]
#     poi = get_loc(context.user_data["fsq_id"])
#     media_id = []
#
#     if context.user_data.get("photo") is not None:
#         media = mastodon_client.media_post(context.user_data.get("photo"), mime_type="image/jpeg")
#         media_id = [media["id"]]
#     # else:
#     #     photo_url = get_poi_top_photo(context.user_data["fsq_id"])
#     #     if photo_url is not None:
#     #         with urllib.request.urlopen(photo_url) as response:
#     #             data = response.read()
#     #             media = mastodon_client.media_post(data, mime_type="image/jpeg")
#     #             media_id = [media["id"]]
#
#     mastodon_client.status_post(
#         f"I'm at {poi['name']} in {poi['locality']}, {poi['region']}, {poi['osm_url']}",
#         visibility="private",
#         media_ids=media_id)
#
#     await update.message.delete()
#     return ConversationHandler.END


async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Displays info on how to use the bot."""
    await update.message.reply_text("Use /start to test this bot.")


# async def setting_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
#     """Cancels and ends the conversation."""
#     user = update.message.from_user
#     logger.info("User %s canceled the conversation.", user.first_name)
#     await update.message.reply_text(
#         text="Setting canceled.",
#         # "Bye! I hope we can talk again some day.",
#         reply_markup=MAIN_MENU
#     )
#
#     return ConversationHandler.END


async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    """Cancels and ends the conversation."""
    user = update.message.from_user
    logger.info("User %s canceled the conversation.", user.first_name)
    await update.message.reply_text(
        text="Canceled.",
        # "Bye! I hope we can talk again some day.",
        reply_markup=MAIN_MENU
    )

    return ConversationHandler.END


class MsgDict(TypedDict):
    media_id: str
    caption: str
    status_id: int
    content: str
    chat_id: int


async def media_group_sender(context: CallbackContext):
    context.job.data = cast(List[MsgDict], context.job.data)

    media_id = []
    chat_id = context.job.data[0].get("chat_id")
    for msg_dict in context.job.data:
        if len(media_id) >= 4:
            print("Cannot attach more than 4 photos")
            break
        file = await context.bot.get_file(msg_dict.get("media_id"))
        img = io.BytesIO()
        await file.download_to_memory(img)

        img.seek(0)

        media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
        media_id.append(media["id"])

        mastodon_client.status_update(
            status=msg_dict.get("content"),
            id=msg_dict.get("status_id"),
            media_ids=media_id)

    await context.bot.send_message(chat_id=chat_id, text="Done",
                                   reply_markup=MAIN_MENU
                                   )


async def photo(update: Update, context: CallbackContext):
    """Stores the photo and asks for a location."""
    await update.message.reply_chat_action(telegram.constants.ChatAction.TYPING)

    try:
        await context.bot.delete_message(chat_id=update.message.chat_id, message_id=context.user_data["prompt_attach_photo_msg_id"])
    except telegram.error.BadRequest as e:
        if "not found" in str(e.message):
            print("prompt_attach_photo_msg already deleted")
            pass

    status_id = context.user_data["status_id"]
    status_content = context.user_data["status_content"]

    message = update.effective_message
    context.user_data["media"] = []
    if message.media_group_id:
        media_id = message.photo[-1].file_id if message.photo else message.effective_attachment.file_id
        msg_dict = {
            "media_id": media_id,
            "caption": message.caption_html,
            "status_id": status_id,
            "content": status_content,
            "chat_id": message.chat_id,
        }
        jobs = context.job_queue.get_jobs_by_name(str(message.media_group_id))
        if jobs:
            jobs[0].data.append(msg_dict)
        else:
            context.job_queue.run_once(callback=media_group_sender, when=5, data=[msg_dict],
                                       name=str(message.media_group_id))
    else:
        file = await update.message.effective_attachment[-1].get_file()
        img = io.BytesIO()
        await file.download_to_memory(img)
        img.seek(0)

        media = mastodon_client.media_post(img.read(), mime_type="image/jpeg")
        mastodon_client.status_update(
            status=status_content,
            id=status_id,
            media_ids=media["id"])

        await update.message.reply_text(text="Done",
                                        reply_markup=MAIN_MENU
                                        )


async def skip_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
    query = update.callback_query
    await query.answer()

    await query.delete_message()
    await query.message.reply_text(
        text="Done.", reply_markup=MAIN_MENU
    )
    return ConversationHandler.END


def main() -> None:
    application = Application.builder().token(BOT_TOKEN).build()

    checkin_handler = ConversationHandler(
        entry_points=[
            CommandHandler("start", start),
            MessageHandler(filters.LOCATION, checkin),
        ],
        states={
            LOCATION: [
                MessageHandler(filters.LOCATION, checkin),
            ],
            LOCATION_SEARCH: [
                MessageHandler(filters.TEXT, location_search_callback),
                CallbackQueryHandler(skip_location_search),
            ],
            WAIT_LOC: [
                CallbackQueryHandler(process_callback),
                MessageHandler(filters.TEXT, manual_location_process_callback)
            ],
            COMMENT: [
                MessageHandler(filters.TEXT, comment_callback),
                CallbackQueryHandler(skip_comment_callback),
            ],
            PHOTO: [MessageHandler(filters.PHOTO, photo),
                    CallbackQueryHandler(skip_photo)],
        },
        fallbacks=[CommandHandler("cancel", cancel)],
        per_message=False,
        allow_reentry=True,
    )

    application.add_handler(checkin_handler, 1)

    # Run the bot until the user presses Ctrl-C
    application.run_polling()


if __name__ == "__main__":
    main()
Powered by cgit v1.2.3 (git 2.41.0)