from peewee import *
TOOT_VISIBILITY_PUBLIC = "public"
TOOT_VISIBILITY_UNLISTED = "unlisted"
TOOT_VISIBILITY_PRIVATE = "private"
db = SqliteDatabase("checkinbot.db")
db.connect(reuse_if_open=True)
class BaseModel(Model):
class Meta:
database = db
class User(BaseModel):
telegram_user_id = CharField(unique=True, primary_key=True)
access_key = CharField(max_length=256)
home_instance = CharField(max_length=256)
home_instance_type = CharField(max_length=128, default="mastodon")
state = CharField(max_length=128)
client_id = CharField(max_length=128)
client_secret = CharField(max_length=128)
toot_visibility = CharField(max_length=128, default=TOOT_VISIBILITY_PRIVATE)
def update_user_visibility(telegram_user_id: str, visibility: str) -> int:
with db.connection_context():
return User.update(toot_visibility=visibility).where(
User.telegram_user_id == telegram_user_id
).execute()
def get_user_by_id(telegram_user_id: str) -> dict:
with db.connection_context():
try:
user = User.get(User.telegram_user_id == telegram_user_id)
return {
"telegram_user_id": user.telegram_user_id,
"access_key": user.access_key,
"home_instance": user.home_instance,
"home_instance_type": user.home_instance_type,
"state": user.state,
"client_id": user.client_id,
"client_secret": user.client_secret,
"toot_visibility": user.toot_visibility,
}
except DoesNotExist:
return {}
def get_user_by_state(state: str) -> dict:
with db.connection_context():
try:
user = User.get(User.state == state)
return {
"telegram_user_id": user.telegram_user_id,
"access_key": user.access_key,
"home_instance": user.home_instance,
"home_instance_type": user.home_instance_type,
"state": user.state,
"client_id": user.client_id,
"client_secret": user.client_secret,
"toot_visibility": user.toot_visibility,
}
except DoesNotExist:
return {}
def get_user_access_key(telegram_user_id: str) -> str:
with db.connection_context():
try:
user = User.get(User.telegram_user_id == telegram_user_id)
return user.access_key
except DoesNotExist:
return ""
def delete_user_by_id(telegram_user_id: str) -> int:
with db.connection_context():
return User.delete().where(User.telegram_user_id == telegram_user_id).execute()
def get_user_home_instance(telegram_user_id: str) -> dict:
with db.connection_context():
try:
user = User.get(User.telegram_user_id == telegram_user_id)
return {
"home_instance": user.home_instance,
"home_instance_type": user.home_instance_type,
"default_visibility": user.toot_visibility,
}
except DoesNotExist:
return {}
class Location(BaseModel):
fsq_id = CharField(unique=True, primary_key=True)
name = CharField(max_length=128)
locality = CharField(max_length=128)
region = CharField(max_length=128)
latitude = CharField(max_length=128)
longitude = CharField(max_length=128)
def get_poi_by_fsq_id(fsq_id) -> dict:
with db.connection_context():
try:
poi = Location.get(Location.fsq_id == fsq_id)
return {
"name": poi.name,
"locality": poi.locality,
"region": poi.region,
"latitude": poi.latitude,
"longitude": poi.longitude,
}
except DoesNotExist:
return {}
def create_or_update_poi(poi: dict) -> int:
with db.connection_context():
return Location.insert(
fsq_id=poi["fsq_id"],
name=poi["name"],
locality=poi["locality"],
region=poi["region"],
latitude=poi["latitude"],
longitude=poi["longitude"],
).on_conflict_replace().execute()
with db.connection_context():
db.create_tables([User, Location])