summaryrefslogtreecommitdiff
path: root/ot_livesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'ot_livesync.c')
-rw-r--r--ot_livesync.c292
1 files changed, 36 insertions, 256 deletions
diff --git a/ot_livesync.c b/ot_livesync.c
index 9e1c723..87fe5cf 100644
--- a/ot_livesync.c
+++ b/ot_livesync.c
@@ -33,23 +33,9 @@ char groupip_1[4] = { 224,0,23,5 };
33#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 33#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
34#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) 34#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
35 35
36#ifdef WANT_SYNC_SCRAPE
37#define LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE 1504
38#define LIVESYNC_OUTGOING_WATERMARK_SCRAPE (sizeof(ot_hash)+sizeof(uint64_t)+sizeof(uint32_t))
39#define LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE 100
40
41#define LIVESYNC_FIRST_BEACON_DELAY (30*60) /* seconds */
42#define LIVESYNC_BEACON_INTERVAL 60 /* seconds */
43#define LIVESYNC_INQUIRE_THRESH 0.75
44#endif /* WANT_SYNC_SCRAPE */
45
46#define LIVESYNC_MAXDELAY 15 /* seconds */ 36#define LIVESYNC_MAXDELAY 15 /* seconds */
47 37
48enum { OT_SYNC_PEER 38enum { OT_SYNC_PEER };
49#ifdef WANT_SYNC_SCRAPE
50 , OT_SYNC_SCRAPE_BEACON, OT_SYNC_SCRAPE_INQUIRE, OT_SYNC_SCRAPE_TELL
51#endif
52};
53 39
54/* Forward declaration */ 40/* Forward declaration */
55static void * livesync_worker( void * args ); 41static void * livesync_worker( void * args );
@@ -59,52 +45,24 @@ static int64 g_socket_in = -1;
59 45
60/* For incoming packets */ 46/* For incoming packets */
61static int64 g_socket_out = -1; 47static int64 g_socket_out = -1;
62static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];
63
64static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
65static uint8_t *g_peerbuffer_pos;
66static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
67 48
49char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
50static size_t g_outbuf_data;
68static ot_time g_next_packet_time; 51static ot_time g_next_packet_time;
69 52
70#ifdef WANT_SYNC_SCRAPE
71/* Live sync scrape buffers, states and timers */
72static ot_time g_next_beacon_time;
73static ot_time g_next_inquire_time;
74
75static uint8_t g_scrapebuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE];
76static uint8_t *g_scrapebuffer_pos;
77static uint8_t *g_scrapebuffer_highwater = g_scrapebuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE - LIVESYNC_OUTGOING_WATERMARK_SCRAPE;
78
79static size_t g_inquire_remote_count;
80static uint32_t g_inquire_remote_host;
81static int g_inquire_inprogress;
82static int g_inquire_bucket;
83#endif /* WANT_SYNC_SCRAPE */
84
85static pthread_t thread_id; 53static pthread_t thread_id;
86void livesync_init( ) { 54void livesync_init( ) {
55
87 if( g_socket_in == -1 ) 56 if( g_socket_in == -1 )
88 exerr( "No socket address for live sync specified." ); 57 exerr( "No socket address for live sync specified." );
89 58
90 /* Prepare outgoing peers buffer */ 59 /* Prepare outgoing peers buffer */
91 g_peerbuffer_pos = g_peerbuffer_start; 60 memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) );
92 memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); 61 uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER);
93 uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); 62 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t );
94 g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
95
96#ifdef WANT_SYNC_SCRAPE
97 /* Prepare outgoing scrape buffer */
98 g_scrapebuffer_pos = g_scrapebuffer_start;
99 memcpy( g_scrapebuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
100 uint32_pack_big( (char*)g_scrapebuffer_pos + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_TELL);
101 g_scrapebuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
102
103 /* Wind up timers for inquires */
104 g_next_beacon_time = g_now_seconds + LIVESYNC_FIRST_BEACON_DELAY;
105#endif /* WANT_SYNC_SCRAPE */
106 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
107 63
64 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
65
108 pthread_create( &thread_id, NULL, livesync_worker, NULL ); 66 pthread_create( &thread_id, NULL, livesync_worker, NULL );
109} 67}
110 68
@@ -148,264 +106,86 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
148} 106}
149 107
150static void livesync_issue_peersync( ) { 108static void livesync_issue_peersync( ) {
151 socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, 109 socket_send4(g_socket_out, g_outbuf, g_outbuf_data, groupip_1, LIVESYNC_PORT);
152 groupip_1, LIVESYNC_PORT); 110 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t );
153 g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
154 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 111 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
155} 112}
156 113
157static void livesync_handle_peersync( ssize_t datalen ) { 114static void livesync_handle_peersync( struct ot_workstruct *ws ) {
158 int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); 115 int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
159 116
160 /* Now basic sanity checks have been done on the live sync packet 117 /* Now basic sanity checks have been done on the live sync packet
161 We might add more testing and logging. */ 118 We might add more testing and logging. */
162 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { 119 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= ws->request_size ) {
163 ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); 120 memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), sizeof( ot_peer ) );
164 ot_hash *hash = (ot_hash*)(g_inbuffer + off); 121 ws->hash = (ot_hash*)(ws->request + off);
165 122
166 if( !g_opentracker_running ) return; 123 if( !g_opentracker_running ) return;
167 124
168 if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) 125 if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_STOPPED )
169 remove_peer_from_torrent( *hash, peer, NULL, FLAG_MCA ); 126 remove_peer_from_torrent( FLAG_MCA, ws );
170 else 127 else
171 add_peer_to_torrent( *hash, peer, FLAG_MCA ); 128 add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 );
172 129
173 off += sizeof( ot_hash ) + sizeof( ot_peer ); 130 off += sizeof( ot_hash ) + sizeof( ot_peer );
174 } 131 }
175 132
176 stats_issue_event(EVENT_SYNC, 0, 133 stats_issue_event(EVENT_SYNC, 0,
177 (datalen - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / 134 (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) /
178 ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ))); 135 ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer )));
179} 136}
180 137
181#ifdef WANT_SYNC_SCRAPE
182void livesync_issue_beacon( ) {
183 size_t torrent_count = mutex_get_torrent_count();
184 uint8_t beacon[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ];
185
186 memcpy( beacon, &g_tracker_id, sizeof( g_tracker_id ) );
187 uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_BEACON);
188 uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + sizeof(uint32_t), (uint32_t)((uint64_t)(torrent_count)>>32) );
189 uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + 2 * sizeof(uint32_t), (uint32_t)torrent_count );
190
191 socket_send4(g_socket_out, (char*)beacon, sizeof(beacon), groupip_1, LIVESYNC_PORT);
192}
193
194void livesync_handle_beacon( ssize_t datalen ) {
195 size_t torrent_count_local, torrent_count_remote;
196 if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) )
197 return;
198 torrent_count_local = mutex_get_torrent_count();
199 torrent_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + sizeof(uint32_t))) << 32);
200 torrent_count_remote |= (size_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + 2 * sizeof(uint32_t));
201
202 /* Empty tracker is useless */
203 if( !torrent_count_remote ) return;
204
205 if( ((double)torrent_count_local ) / ((double)torrent_count_remote) < LIVESYNC_INQUIRE_THRESH) {
206 if( !g_next_inquire_time ) {
207 g_next_inquire_time = g_now_seconds + 2 * LIVESYNC_BEACON_INTERVAL;
208 g_inquire_remote_count = 0;
209 }
210
211 if( torrent_count_remote > g_inquire_remote_count ) {
212 g_inquire_remote_count = torrent_count_remote;
213 memcpy( &g_inquire_remote_host, g_inbuffer, sizeof( g_tracker_id ) );
214 }
215 }
216}
217
218void livesync_issue_inquire( ) {
219 uint8_t inquire[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id)];
220
221 memcpy( inquire, &g_tracker_id, sizeof( g_tracker_id ) );
222 uint32_pack_big( (char*)inquire + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_INQUIRE);
223 memcpy( inquire + sizeof(g_tracker_id) + sizeof(uint32_t), &g_inquire_remote_host, sizeof( g_tracker_id ) );
224
225 socket_send4(g_socket_out, (char*)inquire, sizeof(inquire), groupip_1, LIVESYNC_PORT);
226}
227
228void livesync_handle_inquire( ssize_t datalen ) {
229 if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id) )
230 return;
231
232 /* If it isn't us, they're inquiring, ignore inquiry */
233 if( memcmp( &g_tracker_id, g_inbuffer, sizeof( g_tracker_id ) ) )
234 return;
235
236 /* Start scrape tell on next ticker */
237 if( !g_inquire_inprogress ) {
238 g_inquire_inprogress = 1;
239 g_inquire_bucket = 0;
240 }
241}
242
243void livesync_issue_tell( ) {
244 int packets_to_send = LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE;
245 while( packets_to_send > 0 && g_inquire_bucket < OT_BUCKET_COUNT ) {
246 ot_vector *torrents_list = mutex_bucket_lock( g_inquire_bucket );
247 unsigned int j;
248 for( j=0; j<torrents_list->size; ++j ) {
249 ot_torrent *torrent = (ot_torrent*)(torrents_list->data) + j;
250 memcpy(g_scrapebuffer_pos, torrent->hash, sizeof(ot_hash));
251 g_scrapebuffer_pos += sizeof(ot_hash);
252 uint32_pack_big( (char*)g_scrapebuffer_pos , (uint32_t)(g_now_minutes - torrent->peer_list->base ));
253 uint32_pack_big( (char*)g_scrapebuffer_pos + 4, (uint32_t)((uint64_t)(torrent->peer_list->down_count)>>32) );
254 uint32_pack_big( (char*)g_scrapebuffer_pos + 8, (uint32_t)torrent->peer_list->down_count );
255 g_scrapebuffer_pos += 12;
256
257 if( g_scrapebuffer_pos >= g_scrapebuffer_highwater ) {
258 socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT);
259 g_scrapebuffer_pos = g_scrapebuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t);
260 --packets_to_send;
261 }
262 }
263 mutex_bucket_unlock( g_inquire_bucket++, 0 );
264 if( !g_opentracker_running )
265 return;
266 }
267 if( g_inquire_bucket == OT_BUCKET_COUNT ) {
268 socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT);
269 g_inquire_inprogress = 0;
270 }
271}
272
273void livesync_handle_tell( ssize_t datalen ) {
274 int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
275
276 /* Some instance is in progress of telling. Our inquiry was successful.
277 Don't ask again until we see next beacon. */
278 g_next_inquire_time = 0;
279
280 /* Don't cause any new inquiries during another tracker's tell */
281 if( g_next_beacon_time - g_now_seconds < LIVESYNC_BEACON_INTERVAL )
282 g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL;
283
284 while( off + sizeof(ot_hash) + 12 <= (size_t)datalen ) {
285 ot_hash *hash = (ot_hash*)(g_inbuffer+off);
286 ot_vector *torrents_list = mutex_bucket_lock_by_hash(*hash);
287 size_t down_count_remote;
288 int exactmatch;
289 ot_torrent *torrent = vector_find_or_insert(torrents_list, hash, sizeof(ot_hash), OT_HASH_COMPARE_SIZE, &exactmatch);
290
291 if( !torrent ) {
292 mutex_bucket_unlock_by_hash( *hash, 0 );
293 continue;
294 }
295
296 if( !exactmatch ) {
297 /* Create a new torrent entry, then */
298 memcpy( &torrent->hash, hash, sizeof(ot_hash));
299
300 if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) {
301 vector_remove_torrent( torrents_list, torrent );
302 mutex_bucket_unlock_by_hash( *hash, 0 );
303 continue;
304 }
305
306 byte_zero( torrent->peer_list, sizeof( ot_peerlist ) );
307 torrent->peer_list->base = g_now_minutes - uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash));
308 }
309
310 down_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash ) + sizeof(uint32_t))) << 32);
311 down_count_remote |= (size_t) uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash ) + 2 * sizeof(uint32_t));
312
313 if( down_count_remote > torrent->peer_list->down_count )
314 torrent->peer_list->down_count = down_count_remote;
315 /* else
316 We might think of sending a tell packet, if we have a much larger downloaded count
317 */
318
319 mutex_bucket_unlock( g_inquire_bucket++, exactmatch?0:1 );
320 if( !g_opentracker_running )
321 return;
322 off += sizeof(ot_hash) + 12;
323 }
324}
325#endif /* WANT_SYNC_SCRAPE */
326
327/* Tickle the live sync module from time to time, so no events get 138/* Tickle the live sync module from time to time, so no events get
328 stuck when there's not enough traffic to fill udp packets fast 139 stuck when there's not enough traffic to fill udp packets fast
329 enough */ 140 enough */
330void livesync_ticker( ) { 141void livesync_ticker( ) {
331
332 /* livesync_issue_peersync sets g_next_packet_time */ 142 /* livesync_issue_peersync sets g_next_packet_time */
333 if( g_now_seconds > g_next_packet_time && 143 if( g_now_seconds > g_next_packet_time &&
334 g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) 144 g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
335 livesync_issue_peersync(); 145 livesync_issue_peersync();
336
337#ifdef WANT_SYNC_SCRAPE
338 /* Send first beacon after running at least LIVESYNC_FIRST_BEACON_DELAY
339 seconds and not more often than every LIVESYNC_BEACON_INTERVAL seconds */
340 if( g_now_seconds > g_next_beacon_time ) {
341 livesync_issue_beacon( );
342 g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL;
343 }
344
345 /* If we're interested in an inquiry and waited long enough to see all
346 tracker's beacons, go ahead and inquire */
347 if( g_next_inquire_time && g_now_seconds > g_next_inquire_time ) {
348 livesync_issue_inquire();
349
350 /* If packet gets lost, ask again after LIVESYNC_BEACON_INTERVAL */
351 g_next_inquire_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL;
352 }
353
354 /* If we're in process of telling, let's tell. */
355 if( g_inquire_inprogress )
356 livesync_issue_tell( );
357
358#endif /* WANT_SYNC_SCRAPE */
359} 146}
360 147
361/* Inform live sync about whats going on. */ 148/* Inform live sync about whats going on. */
362void livesync_tell( ot_hash const info_hash, const ot_peer * const peer ) { 149void livesync_tell( struct ot_workstruct *ws ) {
363 150
364 memcpy( g_peerbuffer_pos, info_hash, sizeof(ot_hash) ); 151 memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) );
365 memcpy( g_peerbuffer_pos+sizeof(ot_hash), peer, sizeof(ot_peer) ); 152 memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) );
366 153
367 g_peerbuffer_pos += sizeof(ot_hash)+sizeof(ot_peer); 154 g_outbuf_data += sizeof(ot_hash) + sizeof(ot_peer);
368 155
369 if( g_peerbuffer_pos >= g_peerbuffer_highwater ) 156 if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS )
370 livesync_issue_peersync(); 157 livesync_issue_peersync();
371} 158}
372 159
373static void * livesync_worker( void * args ) { 160static void * livesync_worker( void * args ) {
161 struct ot_workstruct ws;
374 ot_ip6 in_ip; uint16_t in_port; 162 ot_ip6 in_ip; uint16_t in_port;
375 ssize_t datalen;
376 163
377 (void)args; 164 (void)args;
378 165
166 /* Initialize our "thread local storage" */
167 ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE );
168 ws.outbuf = ws.reply = 0;
169
379 memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); 170 memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) );
380 171
381 while( 1 ) { 172 while( 1 ) {
382 datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); 173 ws.request_size = socket_recv4(g_socket_in, (char*)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port);
383 174
384 /* Expect at least tracker id and packet type */ 175 /* Expect at least tracker id and packet type */
385 if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) 176 if( ws.request_size <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) )
386 continue; 177 continue;
387 if( !accesslist_isblessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) 178 if( !accesslist_isblessed(in_ip, OT_PERMISSION_MAY_LIVESYNC))
388 continue; 179 continue;
389 if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { 180 if( !memcmp( ws.inbuf, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
390 /* TODO: log packet coming from ourselves */ 181 /* TODO: log packet coming from ourselves */
391 continue; 182 continue;
392 } 183 }
393 184
394 switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { 185 switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) {
395 case OT_SYNC_PEER: 186 case OT_SYNC_PEER:
396 livesync_handle_peersync( datalen ); 187 livesync_handle_peersync( &ws );
397 break;
398#ifdef WANT_SYNC_SCRAPE
399 case OT_SYNC_SCRAPE_BEACON:
400 livesync_handle_beacon( datalen );
401 break;
402 case OT_SYNC_SCRAPE_INQUIRE:
403 livesync_handle_inquire( datalen );
404 break;
405 case OT_SYNC_SCRAPE_TELL:
406 livesync_handle_tell( datalen );
407 break; 188 break;
408#endif /* WANT_SYNC_SCRAPE */
409 default: 189 default:
410 break; 190 break;
411 } 191 }