from pathlib import Path import json import hmac from typing import Annotated import secrets import random from enum import Enum from dataclasses import dataclass import io import datetime from contextlib import asynccontextmanager from typing import Any from collections.abc import AsyncGenerator, Sequence from sqlalchemy import select, String from sqlalchemy.exc import IntegrityError, NoResultFound from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column from sqlalchemy.dialects.postgresql import ARRAY from litestar import Litestar, get, post, Request from litestar.contrib.jinja import JinjaTemplateEngine from litestar.exceptions import HTTPException, ValidationException, NotFoundException, NotAuthorizedException from litestar.template.config import TemplateConfig from litestar.response import Template, Redirect from litestar.response.file import ASGIFileResponse from litestar.response.streaming import ASGIStreamingResponse from litestar.static_files import create_static_files_router from litestar.enums import RequestEncodingType from litestar.params import Body from litestar import Request from litestar.datastructures import State import ics class Base(DeclarativeBase): pass class Event(Base): __tablename__ = "events" def __init__(self, iden, password, title, time, location, description): self.iden = iden self.password = password self.title = title self.time = time self.location = location self.description = description self.invites = '[]' iden: Mapped[str] = mapped_column(primary_key=True) password: Mapped[str] title: Mapped[str] time: Mapped[datetime.datetime] location: Mapped[str] description: Mapped[str] invites: Mapped[str] def get_invites(self): return json.loads(self.invites) def to_ics(self): fmt = "%Y%m%dT%H%M%S" start = self.time.strftime(fmt) now = datetime.datetime.now().strftime(fmt) return f'''BEGIN:VCALENDAR VERSION:2.0 PRODID:custom BEGIN:VEVENT DESCRIPTION:{self.description} LOCATION:{self.location} DTSTART:{start} DTSTAMP:{now} SUMMARY:{self.title} UID:{self.iden} END:VEVENT END:VCALENDAR''' @asynccontextmanager async def db_connection(app: Litestar) -> AsyncGenerator[None, None]: engine = getattr(app.state, "engine", None) if engine is None: engine = create_async_engine("sqlite+aiosqlite:///symposium.sqlite", echo=True) app.state.engine = engine async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) try: yield finally: await engine.dispose() sessionmaker = async_sessionmaker(expire_on_commit=False) parts = [ "Oxbow", "Limber", "Steadfast", "Regicide", "Swarm", "Weave", "Bough", "Canopy", "Herald", "Scorn", "Alder", "Aerial", "Welkin", "Acrid", "Kindling", "Rapture", "Myrtle", "Envy", "Solstice", "Juniper", "Cleaving", "Stream", "Reaper", "Sluice", "Conduit", "Disdain", "Sylvan", "Ravish", "Atrium", "Thresh", "Harvest", "Water", "Renewal", "Rosy", "Frieze", "Portal", "Vespers", "Litany", "Serpent", "Primate", "Incite", "Canon", "Acquiese", "Mirror", "Script", "Seal", "Privy", "Piercing", "Heresy", "Subduct", "Sceptre", "Arrogance", "Ivory", "Accrete", "Cluster", "Sepulchre", "Summon", "Pleading", "Myriad", "Exalted", "Sentry", "Shriven", "River", "Threshold" ] def gen_iden(): return "-".join(random.choices(parts, k=8)) def error(msg) -> Template: return Template(template_name="error.html", context=dict(error=msg)) def auth_error() -> Template: return error("Unauthorized.") def check_auth(password, event): return password and hmac.compare_digest(event.password, password) @get("/") async def index() -> Template: return Template(template_name="index.html") @get("/event/{iden:str}", name="event") async def event(state: State, iden: str, password: str = "") -> Template: async with sessionmaker(bind=state.engine) as session: async with session.begin(): query = select(Event).where(Event.iden == iden) result = await session.execute(query) try: event = result.scalar_one() except NoResultFound: return error("Not found.") manage = False if password: if not check_auth(password, event): return auth_error() manage = True context = dict(event=event, manage=manage) return Template(template_name="event.html", context=context) @get("/event/{iden:str}/calendar") async def calendar(state: State, iden: str) -> ASGIStreamingResponse: async with sessionmaker(bind=state.engine) as session: async with session.begin(): query = select(Event).where(Event.iden == iden) result = await session.execute(query) try: event = result.scalar_one() except NoResultFound: return error("Not found.") ics = event.to_ics() f = io.StringIO(ics) return ASGIStreamingResponse(iterator=f, media_type='text/plain', headers={ 'Content-Disposition': 'attachment; filename=event.ics', }) @dataclass class EditRequest: title: str when: str where: str what: str @post("/event/create") async def create(request: Request, state: State, data: Annotated[EditRequest, Body(media_type=RequestEncodingType.URL_ENCODED)]) -> Redirect: iden = gen_iden() password = secrets.token_bytes(16).hex() event = Event(iden, password, data.title, datetime.datetime.fromisoformat(data.when), data.where, data.what) async with sessionmaker(bind=state.engine) as session: try: async with session.begin(): session.add(event) except IntegrityError: return error("Iden already exists.") return Redirect("/symposium" + request.app.route_reverse('event', iden=iden) + "?password=" + event.password) @post("/event/{iden:str}/edit") async def edit(state: State, request: Request, iden: str, password: str, data: Annotated[EditRequest, Body(media_type=RequestEncodingType.URL_ENCODED)]) -> Redirect: async with sessionmaker(bind=state.engine) as session: async with session.begin(): query = select(Event).where(Event.iden == iden) result = await session.execute(query) try: event = result.scalar_one() except NoResultFound: return error("Not found.") if not check_auth(password, event): return auth_error() event.title = data.title event.time = datetime.datetime.fromisoformat(data.when) event.location = data.where event.description = data.what return Redirect("/symposium" + request.app.route_reverse('event', iden=iden) + "?password=" + event.password) @dataclass class RemoveRequest: name: str @post("/event/{iden:str}/remove") async def remove(state: State, request: Request, iden: str, data: Annotated[RemoveRequest, Body(media_type=RequestEncodingType.URL_ENCODED)], password: str = "") -> Redirect: async with sessionmaker(bind=state.engine) as session: async with session.begin(): query = select(Event).where(Event.iden == iden) result = await session.execute(query) try: event = result.scalar_one() except NoResultFound: return error("Not found.") if not check_auth(password, event): return auth_error() name = data.name invites = json.loads(event.invites) if name in invites: invites.remove(name) event.invites = json.dumps(invites) return Redirect("/symposium" + request.app.route_reverse('event', iden=iden) + "?password=" + event.password) @dataclass class JoinRequest: name: str @post("/event/{iden:str}/join") async def join(state: State, request: Request, iden: str, data: Annotated[JoinRequest, Body(media_type=RequestEncodingType.URL_ENCODED)], password: str = "") -> Redirect: async with sessionmaker(bind=state.engine) as session: async with session.begin(): query = select(Event).where(Event.iden == iden) result = await session.execute(query) try: event = result.scalar_one() except NoResultFound: return error("Not found.") name = data.name if len(name) > 50: return error("Name too long.") if len(name) == 0: return error("Name too short.") invites = json.loads(event.invites) if name in invites: return error("Name already exists in event.") invites.append(name) event.invites = json.dumps(invites) url = "/symposium" + request.app.route_reverse('event', iden=iden) if password: url += "?password=" + password return Redirect(path=url) app = Litestar( route_handlers=[ index, event, calendar, create, edit, remove, join, create_static_files_router(path='/static', directories=['static']), ], template_config=TemplateConfig( directory=Path("templates"), engine=JinjaTemplateEngine, ), lifespan=[db_connection], )