diff options
Diffstat (limited to 'fullnarp.py')
-rw-r--r-- | fullnarp.py | 143 |
1 files changed, 107 insertions, 36 deletions
diff --git a/fullnarp.py b/fullnarp.py index 6a02b6e..a93ca0a 100644 --- a/fullnarp.py +++ b/fullnarp.py | |||
@@ -38,6 +38,19 @@ var/talks_local_fullnarp which contains non-public lectures and privacy relevant | |||
38 | speaker availibilities. It should only be served behind some auth. | 38 | speaker availibilities. It should only be served behind some auth. |
39 | """ | 39 | """ |
40 | 40 | ||
41 | # Global placeholders for engine and session factory | ||
42 | engine = None | ||
43 | SessionLocal = None | ||
44 | Base = declarative_base() | ||
45 | |||
46 | |||
47 | class TalkPreference(Base): | ||
48 | __tablename__ = "talk_preference" | ||
49 | uid = Column(String, primary_key=True) | ||
50 | public_uid = Column(String, index=True) | ||
51 | talk_ids = Column(String) | ||
52 | |||
53 | |||
41 | # Shared state | 54 | # Shared state |
42 | current_version = {} | 55 | current_version = {} |
43 | newest_version = 0 | 56 | newest_version = 0 |
@@ -46,22 +59,32 @@ current_version_lock = asyncio.Lock() # Lock for managing access to the global | |||
46 | clients = {} # Key: websocket, Value: {'client_id': ..., 'last_version': ...} | 59 | clients = {} # Key: websocket, Value: {'client_id': ..., 'last_version': ...} |
47 | 60 | ||
48 | 61 | ||
62 | async def bootstrap_client(websocket): | ||
63 | """Provide lecture list and votes count to new client""" | ||
64 | |||
65 | |||
49 | async def notify_clients(): | 66 | async def notify_clients(): |
50 | """Notify all connected clients of the current state.""" | 67 | """Notify all connected clients of the current state.""" |
51 | async with current_version_lock: | 68 | async with current_version_lock: |
52 | # Prepare a full state update message with the current version | 69 | # Prepare a full state update message with the current version |
53 | message = {"current_version": newest_version, "data": current_version} | 70 | message = { |
54 | 71 | "property": "fullnarp", | |
55 | # Notify each client about their relevant updates | 72 | "current_version": newest_version, |
56 | for client, info in clients.items(): | 73 | "data": current_version, |
57 | try: | 74 | } |
58 | # Send the state update | 75 | |
59 | await client.send(json.dumps(message)) | 76 | # Notify each client about their relevant updates |
60 | # Update the client's last known version | 77 | for client, info in clients.items(): |
61 | info["last_version"] = newest_version | 78 | try: |
62 | except ConnectionClosedOK: | 79 | # Send the state update |
63 | # Handle disconnected clients gracefully | 80 | await client.send(json.dumps(message)) |
64 | pass | 81 | # Update the client's last known version |
82 | info["last_version"] = newest_version | ||
83 | |||
84 | print("Reply: " + json.dumps(message)) | ||
85 | except ConnectionClosedOK: | ||
86 | # Handle disconnected clients gracefully | ||
87 | pass | ||
65 | 88 | ||
66 | 89 | ||
67 | async def handle_client(websocket): | 90 | async def handle_client(websocket): |
@@ -71,31 +94,59 @@ async def handle_client(websocket): | |||
71 | clients[websocket] = {"client_id": id(websocket), "last_version": 0} | 94 | clients[websocket] = {"client_id": id(websocket), "last_version": 0} |
72 | 95 | ||
73 | try: | 96 | try: |
74 | # Send the current global state to the newly connected client | ||
75 | async with current_version_lock: | ||
76 | global newest_version | ||
77 | await websocket.send( | ||
78 | json.dumps({"current_version": newest_version, "data": current_version}) | ||
79 | ) | ||
80 | clients[websocket][ | ||
81 | "last_version" | ||
82 | ] = newest_version # Update last known version | ||
83 | |||
84 | # Listen for updates from the client | 97 | # Listen for updates from the client |
85 | async for message in websocket: | 98 | async for message in websocket: |
99 | global newest_version | ||
86 | try: | 100 | try: |
87 | # Parse incoming message | 101 | # Parse incoming message |
88 | data = json.loads(message) | 102 | data = json.loads(message) |
89 | 103 | print("Got command " + message) | |
90 | # Update global state with a lock to prevent race conditions | 104 | |
91 | async with current_version_lock: | 105 | if data.get("action", "") == "bootstrap": |
92 | if "setevent" in data: | 106 | print("Got bootstrap command") |
93 | eventid = data["setevent"] | 107 | with open("var/talks_local_fullnarp") as data_file: |
94 | day = data["day"] | 108 | talks = json.load(data_file) |
95 | room = data["room"] | 109 | message = {"property": "pretalx", "data": talks} |
96 | time = data["time"] | 110 | await websocket.send(json.dumps(message)) |
97 | lastupdate = data["lastupdate"] | 111 | |
98 | 112 | with SessionLocal() as session: | |
113 | preferences = session.query(TalkPreference).all() | ||
114 | m = [] | ||
115 | for pref in preferences: | ||
116 | m.append(json.loads(pref.talk_ids)) | ||
117 | message = {"property": "halfnarp", "data": m} | ||
118 | await websocket.send(json.dumps(message)) | ||
119 | |||
120 | async with current_version_lock: | ||
121 | message = { | ||
122 | "property": "fullnarp", | ||
123 | "current_version": newest_version, | ||
124 | "data": current_version, | ||
125 | } | ||
126 | await websocket.send(json.dumps(message)) | ||
127 | print("Reply: " + json.dumps(message)) | ||
128 | |||
129 | elif data.get("action", "") == "reconnect": | ||
130 | |||
131 | async with current_version_lock: | ||
132 | message = { | ||
133 | "property": "fullnarp", | ||
134 | "current_version": newest_version, | ||
135 | "data": current_version, | ||
136 | } | ||
137 | await websocket.send(json.dumps(message)) | ||
138 | |||
139 | elif data.get("action", "") == "remove_event": | ||
140 | pass | ||
141 | |||
142 | elif data.get("action", "") == "set_event": | ||
143 | eventid = data["event_id"] | ||
144 | day = data["day"] | ||
145 | room = data["room"] | ||
146 | time = data["time"] | ||
147 | lastupdate = data["lastupdate"] | ||
148 | |||
149 | async with current_version_lock: | ||
99 | newest_version += 1 # Increment the version | 150 | newest_version += 1 # Increment the version |
100 | print( | 151 | print( |
101 | "Moving event: " | 152 | "Moving event: " |
@@ -127,8 +178,9 @@ async def handle_client(websocket): | |||
127 | ) as outfile: | 178 | ) as outfile: |
128 | json.dump(current_version, outfile) | 179 | json.dump(current_version, outfile) |
129 | 180 | ||
130 | # Notify all clients about the updated global state | 181 | # Notify all clients about the updated global state |
131 | await notify_clients() | 182 | await notify_clients() |
183 | |||
132 | except json.JSONDecodeError: | 184 | except json.JSONDecodeError: |
133 | await websocket.send(json.dumps({"error": "Invalid JSON"})) | 185 | await websocket.send(json.dumps({"error": "Invalid JSON"})) |
134 | except websockets.exceptions.ConnectionClosedError as e: | 186 | except websockets.exceptions.ConnectionClosedError as e: |
@@ -141,6 +193,25 @@ async def handle_client(websocket): | |||
141 | 193 | ||
142 | 194 | ||
143 | async def main(): | 195 | async def main(): |
196 | parser = ArgumentParser(description="halfnarp2") | ||
197 | parser.add_argument( | ||
198 | "-c", "--config", help="Config file location", default="./config.json" | ||
199 | ) | ||
200 | args = parser.parse_args() | ||
201 | |||
202 | global engine, SessionLocal | ||
203 | |||
204 | with open(args.config, mode="r", encoding="utf-8") as json_file: | ||
205 | config = json.load(json_file) | ||
206 | |||
207 | DATABASE_URL = config.get("database-uri", "sqlite:///test.db") | ||
208 | |||
209 | print("Connecting to " + DATABASE_URL) | ||
210 | engine = create_engine(DATABASE_URL, echo=False) | ||
211 | SessionLocal = sessionmaker(bind=engine) | ||
212 | Base.metadata.create_all(bind=engine) | ||
213 | |||
214 | # load state file | ||
144 | newest_file = sorted(listdir("versions/"))[-1] | 215 | newest_file = sorted(listdir("versions/"))[-1] |
145 | global newest_version | 216 | global newest_version |
146 | global current_version | 217 | global current_version |
@@ -154,8 +225,8 @@ async def main(): | |||
154 | current_version = {} | 225 | current_version = {} |
155 | newest_version = 0 | 226 | newest_version = 0 |
156 | 227 | ||
157 | async with websockets.serve(handle_client, "localhost", 5009): | 228 | async with websockets.serve(handle_client, "localhost", 22378): |
158 | print("WebSocket server started on ws://localhost:5009") | 229 | print("WebSocket server started on ws://localhost:22378") |
159 | await asyncio.Future() # Run forever | 230 | await asyncio.Future() # Run forever |
160 | 231 | ||
161 | 232 | ||