import io
import logging
import os
import traceback
from telegram import __version__ as TG_VER
try:
from telegram import __version_info__
except ImportError:
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
if __version_info__ < (20, 0, 0, "alpha", 1):
raise RuntimeError(
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
f"{TG_VER} version of this example, "
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
)
from telegram import ForceReply, Update, File
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
from telegram.constants import ParseMode
from PIL import Image
from square import square_size_padding
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("This is a bot to output image in square shape")
async def process(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_id = update.message.chat_id
filename = update.message.document.file_name
names = filename.split(".")
if str.upper(names[1]) in ("JPG", "JPEG"):
save_format = "JPEG"
elif str.upper(names[1]) in ("PNG",):
save_format = str.upper(names[1])
else:
await context.bot.send_message(chat_id, "Image extension `{}` not supported".format(names[1]),
parse_mode=ParseMode.MARKDOWN_V2)
return
await context.bot.send_message(chat_id, "Processing `{}`".format(filename),
parse_mode=ParseMode.MARKDOWN_V2)
file = await update.message.effective_attachment.get_file()
img = io.BytesIO()
await file.download_to_memory(img)
try:
im = Image.open(img)
result = square_size_padding(im)
output = io.BytesIO()
result.save(output, format=save_format, quality=100)
await update.message.reply_markdown_v2(text="Sending processed result")
await context.bot.send_document(chat_id=update.message.chat_id,
filename="{}-result.{}".format(names[0], names[1]),
document=output.getvalue())
except Exception as e:
await update.message.reply_markdown_v2(text="Error:\n```{}```".format(traceback.format_exc()))
def main() -> None:
tg_token = os.getenv("TG_TOKEN")
application = Application.builder().token(tg_token).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.ATTACHMENT & ~filters.COMMAND, process))
application.run_polling()
if __name__ == "__main__":
main()