from peewee import * TOOT_VISIBILITY_PUBLIC = "public" TOOT_VISIBILITY_UNLISTED = "unlisted" TOOT_VISIBILITY_PRIVATE = "private" db = SqliteDatabase("database/checkinbot.db") db.connect(reuse_if_open=True) class BaseModel(Model): class Meta: database = db class User(BaseModel): telegram_user_id = CharField(unique=True, primary_key=True) access_key = CharField(max_length=256) home_instance = CharField(max_length=256) home_instance_type = CharField(max_length=128, default="mastodon") state = CharField(max_length=128) client_id = CharField(max_length=128) client_secret = CharField(max_length=128) toot_visibility = CharField(max_length=128, default=TOOT_VISIBILITY_PRIVATE) def update_user_visibility(telegram_user_id: str, visibility: str) -> int: with db.connection_context(): return User.update(toot_visibility=visibility).where( User.telegram_user_id == telegram_user_id ).execute() def get_user_by_id(telegram_user_id: str) -> dict: with db.connection_context(): try: user = User.get(User.telegram_user_id == telegram_user_id) return { "telegram_user_id": user.telegram_user_id, "access_key": user.access_key, "home_instance": user.home_instance, "home_instance_type": user.home_instance_type, "state": user.state, "client_id": user.client_id, "client_secret": user.client_secret, "toot_visibility": user.toot_visibility, } except DoesNotExist: return {} def get_user_by_state(state: str) -> dict: with db.connection_context(): try: user = User.get(User.state == state) return { "telegram_user_id": user.telegram_user_id, "access_key": user.access_key, "home_instance": user.home_instance, "home_instance_type": user.home_instance_type, "state": user.state, "client_id": user.client_id, "client_secret": user.client_secret, "toot_visibility": user.toot_visibility, } except DoesNotExist: return {} def get_user_access_key(telegram_user_id: str) -> str: with db.connection_context(): try: user = User.get(User.telegram_user_id == telegram_user_id) return user.access_key except DoesNotExist: return "" def delete_user_by_id(telegram_user_id: str) -> int: with db.connection_context(): return User.delete().where(User.telegram_user_id == telegram_user_id).execute() def get_user_home_instance(telegram_user_id: str) -> dict: with db.connection_context(): try: user = User.get(User.telegram_user_id == telegram_user_id) return { "home_instance": user.home_instance, "home_instance_type": user.home_instance_type, "default_visibility": user.toot_visibility, } except DoesNotExist: return {} class Location(BaseModel): fsq_id = CharField(unique=True, primary_key=True) name = CharField(max_length=128) locality = CharField(max_length=128) region = CharField(max_length=128) latitude = CharField(max_length=128) longitude = CharField(max_length=128) def get_poi_by_fsq_id(fsq_id) -> dict: with db.connection_context(): try: poi = Location.get(Location.fsq_id == fsq_id) return { "name": poi.name, "locality": poi.locality, "region": poi.region, "latitude": poi.latitude, "longitude": poi.longitude, } except DoesNotExist: return {} def create_or_update_poi(poi: dict) -> int: with db.connection_context(): return Location.insert( fsq_id=poi["fsq_id"], name=poi["name"], locality=poi["locality"], region=poi["region"], latitude=poi["latitude"], longitude=poi["longitude"], ).on_conflict_replace().execute() with db.connection_context(): db.create_tables([User, Location])