From 7f155dc09e2b8862d68ee40d514de16c064bf449 Mon Sep 17 00:00:00 2001 From: erdgeist Date: Sat, 4 Jan 2025 02:46:47 +0100 Subject: Get prototype working --- fullnarp.py | 143 +++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 107 insertions(+), 36 deletions(-) (limited to 'fullnarp.py') diff --git a/fullnarp.py b/fullnarp.py index 6a02b6e..a93ca0a 100644 --- a/fullnarp.py +++ b/fullnarp.py @@ -38,6 +38,19 @@ var/talks_local_fullnarp which contains non-public lectures and privacy relevant speaker availibilities. It should only be served behind some auth. """ +# Global placeholders for engine and session factory +engine = None +SessionLocal = None +Base = declarative_base() + + +class TalkPreference(Base): + __tablename__ = "talk_preference" + uid = Column(String, primary_key=True) + public_uid = Column(String, index=True) + talk_ids = Column(String) + + # Shared state current_version = {} newest_version = 0 @@ -46,22 +59,32 @@ current_version_lock = asyncio.Lock() # Lock for managing access to the global clients = {} # Key: websocket, Value: {'client_id': ..., 'last_version': ...} +async def bootstrap_client(websocket): + """Provide lecture list and votes count to new client""" + + async def notify_clients(): """Notify all connected clients of the current state.""" async with current_version_lock: # Prepare a full state update message with the current version - message = {"current_version": newest_version, "data": current_version} - - # Notify each client about their relevant updates - for client, info in clients.items(): - try: - # Send the state update - await client.send(json.dumps(message)) - # Update the client's last known version - info["last_version"] = newest_version - except ConnectionClosedOK: - # Handle disconnected clients gracefully - pass + message = { + "property": "fullnarp", + "current_version": newest_version, + "data": current_version, + } + + # Notify each client about their relevant updates + for client, info in clients.items(): + try: + # Send the state update + await client.send(json.dumps(message)) + # Update the client's last known version + info["last_version"] = newest_version + + print("Reply: " + json.dumps(message)) + except ConnectionClosedOK: + # Handle disconnected clients gracefully + pass async def handle_client(websocket): @@ -71,31 +94,59 @@ async def handle_client(websocket): clients[websocket] = {"client_id": id(websocket), "last_version": 0} try: - # Send the current global state to the newly connected client - async with current_version_lock: - global newest_version - await websocket.send( - json.dumps({"current_version": newest_version, "data": current_version}) - ) - clients[websocket][ - "last_version" - ] = newest_version # Update last known version - # Listen for updates from the client async for message in websocket: + global newest_version try: # Parse incoming message data = json.loads(message) - - # Update global state with a lock to prevent race conditions - async with current_version_lock: - if "setevent" in data: - eventid = data["setevent"] - day = data["day"] - room = data["room"] - time = data["time"] - lastupdate = data["lastupdate"] - + print("Got command " + message) + + if data.get("action", "") == "bootstrap": + print("Got bootstrap command") + with open("var/talks_local_fullnarp") as data_file: + talks = json.load(data_file) + message = {"property": "pretalx", "data": talks} + await websocket.send(json.dumps(message)) + + with SessionLocal() as session: + preferences = session.query(TalkPreference).all() + m = [] + for pref in preferences: + m.append(json.loads(pref.talk_ids)) + message = {"property": "halfnarp", "data": m} + await websocket.send(json.dumps(message)) + + async with current_version_lock: + message = { + "property": "fullnarp", + "current_version": newest_version, + "data": current_version, + } + await websocket.send(json.dumps(message)) + print("Reply: " + json.dumps(message)) + + elif data.get("action", "") == "reconnect": + + async with current_version_lock: + message = { + "property": "fullnarp", + "current_version": newest_version, + "data": current_version, + } + await websocket.send(json.dumps(message)) + + elif data.get("action", "") == "remove_event": + pass + + elif data.get("action", "") == "set_event": + eventid = data["event_id"] + day = data["day"] + room = data["room"] + time = data["time"] + lastupdate = data["lastupdate"] + + async with current_version_lock: newest_version += 1 # Increment the version print( "Moving event: " @@ -127,8 +178,9 @@ async def handle_client(websocket): ) as outfile: json.dump(current_version, outfile) - # Notify all clients about the updated global state - await notify_clients() + # Notify all clients about the updated global state + await notify_clients() + except json.JSONDecodeError: await websocket.send(json.dumps({"error": "Invalid JSON"})) except websockets.exceptions.ConnectionClosedError as e: @@ -141,6 +193,25 @@ async def handle_client(websocket): async def main(): + parser = ArgumentParser(description="halfnarp2") + parser.add_argument( + "-c", "--config", help="Config file location", default="./config.json" + ) + args = parser.parse_args() + + global engine, SessionLocal + + with open(args.config, mode="r", encoding="utf-8") as json_file: + config = json.load(json_file) + + DATABASE_URL = config.get("database-uri", "sqlite:///test.db") + + print("Connecting to " + DATABASE_URL) + engine = create_engine(DATABASE_URL, echo=False) + SessionLocal = sessionmaker(bind=engine) + Base.metadata.create_all(bind=engine) + + # load state file newest_file = sorted(listdir("versions/"))[-1] global newest_version global current_version @@ -154,8 +225,8 @@ async def main(): current_version = {} newest_version = 0 - async with websockets.serve(handle_client, "localhost", 5009): - print("WebSocket server started on ws://localhost:5009") + async with websockets.serve(handle_client, "localhost", 22378): + print("WebSocket server started on ws://localhost:22378") await asyncio.Future() # Run forever -- cgit v1.2.3