1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
|
from os import listdir
from argparse import ArgumentParser
import json
import asyncio
import websockets
from websockets.exceptions import ConnectionClosedOK
from sqlalchemy import Column, String, create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
"""
This is best served by an nginx block that should look a bit like this, with
a halfnarp2 checked out in the home of user halfnarp:
location /fullnarp/ {
alias /home/halfnarp/halfnarp2/static/;
index fullnarp.html;
}
location /fullnarp/versions {
alias /home/halfnarp/halfnarp2/var/versions/;
}
location /fullnarp/ws/ {
proxy_pass http://127.0.0.1:5042;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Set keepalive timeout
proxy_read_timeout 60; # Set a read timeout to prevent connection closures
proxy_send_timeout 60; # Set a send timeout to prevent connection closures
}
When importing talks from pretalx with halfnarp2.py -i, it creates a file in
var/talks_local_fullnarp which contains non-public lectures and privacy relevant
speaker availibilities. It should only be served behind some auth.
"""
# Global placeholders for engine and session factory
engine = None
SessionLocal = None
Base = declarative_base()
fullnarp_path = ""
class TalkPreference(Base):
__tablename__ = "talk_preference"
uid = Column(String, primary_key=True)
public_uid = Column(String, index=True)
talk_ids = Column(String)
# Shared state
current_version = {}
newest_version = 0
current_version_lock = asyncio.Lock() # Lock for managing access to the global state
clients = {} # Key: websocket, Value: {'client_id': ..., 'last_version': ...}
async def bootstrap_client(websocket):
"""Provide lecture list and votes count to new client"""
async def notify_clients():
"""Notify all connected clients of the current state."""
async with current_version_lock:
# Prepare a full state update message with the current version
message = {
"property": "fullnarp",
"current_version": newest_version,
"version_url": fullnarp_path
+ "versions/fullnarp_"
+ str(newest_version).zfill(5)
+ ".json",
"data": current_version,
}
# Notify each client about their relevant updates
for client, info in clients.items():
try:
# Send the state update
await client.send(json.dumps(message))
# Update the client's last known version
info["last_version"] = newest_version
print("Reply: " + json.dumps(message))
except ConnectionClosedOK:
# Handle disconnected clients gracefully
pass
async def handle_client(websocket):
"""Handle incoming WebSocket connections."""
# Initialize per-connection state
clients[websocket] = {"client_id": id(websocket), "last_version": 0}
try:
# Listen for updates from the client
async for message in websocket:
global newest_version
try:
# Parse incoming message
data = json.loads(message)
print("Got command " + message)
if data.get("action", "") == "bootstrap":
print("Got bootstrap command")
with open("var/talks_local_fullnarp") as data_file:
talks = json.load(data_file)
message = {"property": "pretalx", "data": talks}
await websocket.send(json.dumps(message))
with SessionLocal() as session:
preferences = session.query(TalkPreference).all()
m = []
for pref in preferences:
m.append(json.loads(pref.talk_ids))
message = {"property": "halfnarp", "data": m}
await websocket.send(json.dumps(message))
async with current_version_lock:
message = {
"property": "fullnarp",
"current_version": newest_version,
"version_url": fullnarp_path
+ "versions/fullnarp_"
+ str(newest_version).zfill(5)
+ ".json",
"data": current_version,
}
await websocket.send(json.dumps(message))
print("Reply: " + json.dumps(message))
elif data.get("action", "") == "reconnect":
async with current_version_lock:
message = {
"property": "fullnarp",
"current_version": newest_version,
"data": current_version,
}
await websocket.send(json.dumps(message))
elif data.get("action", "") == "remove_event":
pass
elif data.get("action", "") == "set_event":
eventid = data["event_id"]
day = data["day"]
room = data["room"]
time = data["time"]
lastupdate = data["lastupdate"]
async with current_version_lock:
newest_version += 1 # Increment the version
print(
"Moving event: "
+ eventid
+ " to day "
+ day
+ " at "
+ time
+ " in room "
+ room
+ " newcurrentversion "
+ str(newest_version)
)
if not eventid in current_version or int(
current_version[eventid]["lastupdate"]
) <= int(lastupdate):
current_version[eventid] = {
"day": day,
"room": room,
"time": time,
"lastupdate": int(newest_version),
}
with open(
"var/versions/fullnarp_"
+ str(newest_version).zfill(5)
+ ".json",
"w",
) as outfile:
json.dump(current_version, outfile)
# Notify all clients about the updated global state
await notify_clients()
except json.JSONDecodeError:
await websocket.send(json.dumps({"error": "Invalid JSON"}))
except websockets.exceptions.ConnectionClosedError as e:
print(f"Client disconnected abruptly: {e}")
except ConnectionClosedOK:
pass
finally:
# Cleanup when the client disconnects
del clients[websocket]
async def main():
parser = ArgumentParser(description="fullnarp")
parser.add_argument(
"-c", "--config", help="Config file location", default="./config.json"
)
args = parser.parse_args()
global engine, SessionLocal, fullnarp_path
with open(args.config, mode="r", encoding="utf-8") as json_file:
config = json.load(json_file)
DATABASE_URL = config.get("database-uri", "sqlite:///test.db")
fullnarp_path = config.get("fullnarp-path")
print(f"Connecting to {DATABASE_URL}")
engine = create_engine(DATABASE_URL, echo=False)
SessionLocal = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
# load state file
newest_file = sorted(listdir("var/versions/"))[-1]
global newest_version
global current_version
if newest_file:
newest_version = int(newest_file.replace("fullnarp_", "").replace(".json", ""))
print("Resuming from version: " + str(newest_version))
with open("var/versions/" + str(newest_file)) as data_file:
current_version = json.load(data_file)
else:
current_version = {}
newest_version = 0
ws_host = config.get("websocket-host", "localhost")
ws_port = config.get("websocket-port", 5042)
async with websockets.serve(handle_client, ws_host, ws_port):
print(f"WebSocket server started on ws://{ws_host}:{ws_port}")
await asyncio.Future() # Run forever
if __name__ == "__main__":
asyncio.run(main())
|