summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ot_http.c45
-rw-r--r--ot_livesync.c292
-rw-r--r--ot_livesync.h40
-rw-r--r--ot_stats.c18
-rw-r--r--ot_udp.c39
-rw-r--r--trackerlogic.c92
-rw-r--r--trackerlogic.h38
7 files changed, 168 insertions, 396 deletions
diff --git a/ot_http.c b/ot_http.c
index c544468..567cba3 100644
--- a/ot_http.c
+++ b/ot_http.c
@@ -369,12 +369,11 @@ static ot_keywords keywords_announce[] = { { "port", 1 }, { "left", 2 }, { "even
369#ifdef WANT_FULLLOG_NETWORKS 369#ifdef WANT_FULLLOG_NETWORKS
370{ "lognet", 8 }, 370{ "lognet", 8 },
371#endif 371#endif
372{ "peer_id", 9 },
372{ NULL, -3 } }; 373{ NULL, -3 } };
373static ot_keywords keywords_announce_event[] = { { "completed", 1 }, { "stopped", 2 }, { NULL, -3 } }; 374static ot_keywords keywords_announce_event[] = { { "completed", 1 }, { "stopped", 2 }, { NULL, -3 } };
374static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws, char *read_ptr ) { 375static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws, char *read_ptr ) {
375 int numwant, tmp, scanon; 376 int numwant, tmp, scanon;
376 ot_peer peer;
377 ot_hash *hash = NULL;
378 unsigned short port = 0; 377 unsigned short port = 0;
379 char *write_ptr; 378 char *write_ptr;
380 ssize_t len; 379 ssize_t len;
@@ -392,14 +391,18 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws,
392 ot_ip6 proxied_ip; 391 ot_ip6 proxied_ip;
393 char *fwd = http_header( ws->request, ws->header_size, "x-forwarded-for" ); 392 char *fwd = http_header( ws->request, ws->header_size, "x-forwarded-for" );
394 if( fwd && scan_ip6( fwd, proxied_ip ) ) 393 if( fwd && scan_ip6( fwd, proxied_ip ) )
395 OT_SETIP( &peer, proxied_ip ); 394 OT_SETIP( &ws->peer, proxied_ip );
396 else 395 else
397 OT_SETIP( &peer, cookie->ip ); 396 OT_SETIP( &ws->peer, cookie->ip );
398 } else 397 } else
399#endif 398#endif
400 OT_SETIP( &peer, cookie->ip ); 399
401 OT_SETPORT( &peer, &port ); 400 ws->peer_id = NULL;
402 OT_PEERFLAG( &peer ) = 0; 401 ws->hash = NULL;
402
403 OT_SETIP( &ws->peer, cookie->ip );
404 OT_SETPORT( &ws->peer, &port );
405 OT_PEERFLAG( &ws->peer ) = 0;
403 numwant = 50; 406 numwant = 50;
404 scanon = 1; 407 scanon = 1;
405 408
@@ -411,21 +414,21 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws,
411 case 1: /* matched "port" */ 414 case 1: /* matched "port" */
412 len = scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE ); 415 len = scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE );
413 if( ( len <= 0 ) || scan_fixed_int( write_ptr, len, &tmp ) || ( tmp > 0xffff ) ) HTTPERROR_400_PARAM; 416 if( ( len <= 0 ) || scan_fixed_int( write_ptr, len, &tmp ) || ( tmp > 0xffff ) ) HTTPERROR_400_PARAM;
414 port = htons( tmp ); OT_SETPORT( &peer, &port ); 417 port = htons( tmp ); OT_SETPORT( &ws->peer, &port );
415 break; 418 break;
416 case 2: /* matched "left" */ 419 case 2: /* matched "left" */
417 if( ( len = scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE ) ) <= 0 ) HTTPERROR_400_PARAM; 420 if( ( len = scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE ) ) <= 0 ) HTTPERROR_400_PARAM;
418 if( scan_fixed_int( write_ptr, len, &tmp ) ) tmp = 0; 421 if( scan_fixed_int( write_ptr, len, &tmp ) ) tmp = 0;
419 if( !tmp ) OT_PEERFLAG( &peer ) |= PEER_FLAG_SEEDING; 422 if( !tmp ) OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_SEEDING;
420 break; 423 break;
421 case 3: /* matched "event" */ 424 case 3: /* matched "event" */
422 switch( scan_find_keywords( keywords_announce_event, &read_ptr, SCAN_SEARCHPATH_VALUE ) ) { 425 switch( scan_find_keywords( keywords_announce_event, &read_ptr, SCAN_SEARCHPATH_VALUE ) ) {
423 case -1: HTTPERROR_400_PARAM; 426 case -1: HTTPERROR_400_PARAM;
424 case 1: /* matched "completed" */ 427 case 1: /* matched "completed" */
425 OT_PEERFLAG( &peer ) |= PEER_FLAG_COMPLETED; 428 OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_COMPLETED;
426 break; 429 break;
427 case 2: /* matched "stopped" */ 430 case 2: /* matched "stopped" */
428 OT_PEERFLAG( &peer ) |= PEER_FLAG_STOPPED; 431 OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_STOPPED;
429 break; 432 break;
430 default: 433 default:
431 break; 434 break;
@@ -443,10 +446,10 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws,
443 if( !tmp ) HTTPERROR_400_COMPACT; 446 if( !tmp ) HTTPERROR_400_COMPACT;
444 break; 447 break;
445 case 6: /* matched "info_hash" */ 448 case 6: /* matched "info_hash" */
446 if( hash ) HTTPERROR_400_DOUBLEHASH; 449 if( ws->hash ) HTTPERROR_400_DOUBLEHASH;
447 /* ignore this, when we have less than 20 bytes */ 450 /* ignore this, when we have less than 20 bytes */
448 if( scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE ) != 20 ) HTTPERROR_400_PARAM; 451 if( scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE ) != 20 ) HTTPERROR_400_PARAM;
449 hash = (ot_hash*)write_ptr; 452 ws->hash = (ot_hash*)write_ptr;
450 break; 453 break;
451#ifdef WANT_IP_FROM_QUERY_STRING 454#ifdef WANT_IP_FROM_QUERY_STRING
452 case 7: /* matched "ip" */ 455 case 7: /* matched "ip" */
@@ -455,7 +458,7 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws,
455 len = scan_urlencoded_query( &read_ptr, tmp_buf2, SCAN_SEARCHPATH_VALUE ); 458 len = scan_urlencoded_query( &read_ptr, tmp_buf2, SCAN_SEARCHPATH_VALUE );
456 tmp_buf2[len] = 0; 459 tmp_buf2[len] = 0;
457 if( ( len <= 0 ) || !scan_ip6( tmp_buf2, tmp_buf1 ) ) HTTPERROR_400_PARAM; 460 if( ( len <= 0 ) || !scan_ip6( tmp_buf2, tmp_buf1 ) ) HTTPERROR_400_PARAM;
458 OT_SETIP( &peer, tmp_buf1 ); 461 OT_SETIP( &ws->peer, tmp_buf1 );
459 } 462 }
460 break; 463 break;
461#endif 464#endif
@@ -490,6 +493,12 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws,
490 //} 493 //}
491 } 494 }
492#endif 495#endif
496 break;
497 case 9: /* matched "peer_id" */
498 /* ignore this, when we have less than 20 bytes */
499 if( scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE ) != 20 ) HTTPERROR_400_PARAM;
500 ws->peer_id = write_ptr;
501 break;
493 } 502 }
494 } 503 }
495 504
@@ -501,13 +510,13 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws,
501 stats_issue_event( EVENT_ACCEPT, FLAG_TCP, (uintptr_t)ws->reply ); 510 stats_issue_event( EVENT_ACCEPT, FLAG_TCP, (uintptr_t)ws->reply );
502 511
503 /* Scanned whole query string */ 512 /* Scanned whole query string */
504 if( !hash ) 513 if( !ws->hash )
505 return ws->reply_size = sprintf( ws->reply, "d14:failure reason80:Your client forgot to send your torrent's info_hash. Please upgrade your client.e" ); 514 return ws->reply_size = sprintf( ws->reply, "d14:failure reason80:Your client forgot to send your torrent's info_hash. Please upgrade your client.e" );
506 515
507 if( OT_PEERFLAG( &peer ) & PEER_FLAG_STOPPED ) 516 if( OT_PEERFLAG( &ws->peer ) & PEER_FLAG_STOPPED )
508 ws->reply_size = remove_peer_from_torrent( *hash, &peer, ws->reply, FLAG_TCP ); 517 ws->reply_size = remove_peer_from_torrent( FLAG_TCP, ws );
509 else 518 else
510 ws->reply_size = add_peer_to_torrent_and_return_peers( *hash, &peer, FLAG_TCP, numwant, ws->reply ); 519 ws->reply_size = add_peer_to_torrent_and_return_peers( FLAG_TCP, ws, numwant );
511 520
512 stats_issue_event( EVENT_ANNOUNCE, FLAG_TCP, ws->reply_size); 521 stats_issue_event( EVENT_ANNOUNCE, FLAG_TCP, ws->reply_size);
513 return ws->reply_size; 522 return ws->reply_size;
diff --git a/ot_livesync.c b/ot_livesync.c
index 9e1c723..87fe5cf 100644
--- a/ot_livesync.c
+++ b/ot_livesync.c
@@ -33,23 +33,9 @@ char groupip_1[4] = { 224,0,23,5 };
33#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 33#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
34#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) 34#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
35 35
36#ifdef WANT_SYNC_SCRAPE
37#define LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE 1504
38#define LIVESYNC_OUTGOING_WATERMARK_SCRAPE (sizeof(ot_hash)+sizeof(uint64_t)+sizeof(uint32_t))
39#define LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE 100
40
41#define LIVESYNC_FIRST_BEACON_DELAY (30*60) /* seconds */
42#define LIVESYNC_BEACON_INTERVAL 60 /* seconds */
43#define LIVESYNC_INQUIRE_THRESH 0.75
44#endif /* WANT_SYNC_SCRAPE */
45
46#define LIVESYNC_MAXDELAY 15 /* seconds */ 36#define LIVESYNC_MAXDELAY 15 /* seconds */
47 37
48enum { OT_SYNC_PEER 38enum { OT_SYNC_PEER };
49#ifdef WANT_SYNC_SCRAPE
50 , OT_SYNC_SCRAPE_BEACON, OT_SYNC_SCRAPE_INQUIRE, OT_SYNC_SCRAPE_TELL
51#endif
52};
53 39
54/* Forward declaration */ 40/* Forward declaration */
55static void * livesync_worker( void * args ); 41static void * livesync_worker( void * args );
@@ -59,52 +45,24 @@ static int64 g_socket_in = -1;
59 45
60/* For incoming packets */ 46/* For incoming packets */
61static int64 g_socket_out = -1; 47static int64 g_socket_out = -1;
62static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];
63
64static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
65static uint8_t *g_peerbuffer_pos;
66static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
67 48
49char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
50static size_t g_outbuf_data;
68static ot_time g_next_packet_time; 51static ot_time g_next_packet_time;
69 52
70#ifdef WANT_SYNC_SCRAPE
71/* Live sync scrape buffers, states and timers */
72static ot_time g_next_beacon_time;
73static ot_time g_next_inquire_time;
74
75static uint8_t g_scrapebuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE];
76static uint8_t *g_scrapebuffer_pos;
77static uint8_t *g_scrapebuffer_highwater = g_scrapebuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE - LIVESYNC_OUTGOING_WATERMARK_SCRAPE;
78
79static size_t g_inquire_remote_count;
80static uint32_t g_inquire_remote_host;
81static int g_inquire_inprogress;
82static int g_inquire_bucket;
83#endif /* WANT_SYNC_SCRAPE */
84
85static pthread_t thread_id; 53static pthread_t thread_id;
86void livesync_init( ) { 54void livesync_init( ) {
55
87 if( g_socket_in == -1 ) 56 if( g_socket_in == -1 )
88 exerr( "No socket address for live sync specified." ); 57 exerr( "No socket address for live sync specified." );
89 58
90 /* Prepare outgoing peers buffer */ 59 /* Prepare outgoing peers buffer */
91 g_peerbuffer_pos = g_peerbuffer_start; 60 memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) );
92 memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); 61 uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER);
93 uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); 62 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t );
94 g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
95
96#ifdef WANT_SYNC_SCRAPE
97 /* Prepare outgoing scrape buffer */
98 g_scrapebuffer_pos = g_scrapebuffer_start;
99 memcpy( g_scrapebuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
100 uint32_pack_big( (char*)g_scrapebuffer_pos + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_TELL);
101 g_scrapebuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
102
103 /* Wind up timers for inquires */
104 g_next_beacon_time = g_now_seconds + LIVESYNC_FIRST_BEACON_DELAY;
105#endif /* WANT_SYNC_SCRAPE */
106 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
107 63
64 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
65
108 pthread_create( &thread_id, NULL, livesync_worker, NULL ); 66 pthread_create( &thread_id, NULL, livesync_worker, NULL );
109} 67}
110 68
@@ -148,264 +106,86 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
148} 106}
149 107
150static void livesync_issue_peersync( ) { 108static void livesync_issue_peersync( ) {
151 socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, 109 socket_send4(g_socket_out, g_outbuf, g_outbuf_data, groupip_1, LIVESYNC_PORT);
152 groupip_1, LIVESYNC_PORT); 110 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t );
153 g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
154 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 111 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
155} 112}
156 113
157static void livesync_handle_peersync( ssize_t datalen ) { 114static void livesync_handle_peersync( struct ot_workstruct *ws ) {
158 int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); 115 int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
159 116
160 /* Now basic sanity checks have been done on the live sync packet 117 /* Now basic sanity checks have been done on the live sync packet
161 We might add more testing and logging. */ 118 We might add more testing and logging. */
162 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { 119 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= ws->request_size ) {
163 ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); 120 memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), sizeof( ot_peer ) );
164 ot_hash *hash = (ot_hash*)(g_inbuffer + off); 121 ws->hash = (ot_hash*)(ws->request + off);
165 122
166 if( !g_opentracker_running ) return; 123 if( !g_opentracker_running ) return;
167 124
168 if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) 125 if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_STOPPED )
169 remove_peer_from_torrent( *hash, peer, NULL, FLAG_MCA ); 126 remove_peer_from_torrent( FLAG_MCA, ws );
170 else 127 else
171 add_peer_to_torrent( *hash, peer, FLAG_MCA ); 128 add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 );
172 129
173 off += sizeof( ot_hash ) + sizeof( ot_peer ); 130 off += sizeof( ot_hash ) + sizeof( ot_peer );
174 } 131 }
175 132
176 stats_issue_event(EVENT_SYNC, 0, 133 stats_issue_event(EVENT_SYNC, 0,
177 (datalen - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / 134 (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) /
178 ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ))); 135 ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer )));
179} 136}
180 137
181#ifdef WANT_SYNC_SCRAPE
182void livesync_issue_beacon( ) {
183 size_t torrent_count = mutex_get_torrent_count();
184 uint8_t beacon[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ];
185
186 memcpy( beacon, &g_tracker_id, sizeof( g_tracker_id ) );
187 uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_BEACON);
188 uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + sizeof(uint32_t), (uint32_t)((uint64_t)(torrent_count)>>32) );
189 uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + 2 * sizeof(uint32_t), (uint32_t)torrent_count );
190
191 socket_send4(g_socket_out, (char*)beacon, sizeof(beacon), groupip_1, LIVESYNC_PORT);
192}
193
194void livesync_handle_beacon( ssize_t datalen ) {
195 size_t torrent_count_local, torrent_count_remote;
196 if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) )
197 return;
198 torrent_count_local = mutex_get_torrent_count();
199 torrent_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + sizeof(uint32_t))) << 32);
200 torrent_count_remote |= (size_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + 2 * sizeof(uint32_t));
201
202 /* Empty tracker is useless */
203 if( !torrent_count_remote ) return;
204
205 if( ((double)torrent_count_local ) / ((double)torrent_count_remote) < LIVESYNC_INQUIRE_THRESH) {
206 if( !g_next_inquire_time ) {
207 g_next_inquire_time = g_now_seconds + 2 * LIVESYNC_BEACON_INTERVAL;
208 g_inquire_remote_count = 0;
209 }
210
211 if( torrent_count_remote > g_inquire_remote_count ) {
212 g_inquire_remote_count = torrent_count_remote;
213 memcpy( &g_inquire_remote_host, g_inbuffer, sizeof( g_tracker_id ) );
214 }
215 }
216}
217
218void livesync_issue_inquire( ) {
219 uint8_t inquire[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id)];
220
221 memcpy( inquire, &g_tracker_id, sizeof( g_tracker_id ) );
222 uint32_pack_big( (char*)inquire + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_INQUIRE);
223 memcpy( inquire + sizeof(g_tracker_id) + sizeof(uint32_t), &g_inquire_remote_host, sizeof( g_tracker_id ) );
224
225 socket_send4(g_socket_out, (char*)inquire, sizeof(inquire), groupip_1, LIVESYNC_PORT);
226}
227
228void livesync_handle_inquire( ssize_t datalen ) {
229 if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id) )
230 return;
231
232 /* If it isn't us, they're inquiring, ignore inquiry */
233 if( memcmp( &g_tracker_id, g_inbuffer, sizeof( g_tracker_id ) ) )
234 return;
235
236 /* Start scrape tell on next ticker */
237 if( !g_inquire_inprogress ) {
238 g_inquire_inprogress = 1;
239 g_inquire_bucket = 0;
240 }
241}
242
243void livesync_issue_tell( ) {
244 int packets_to_send = LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE;
245 while( packets_to_send > 0 && g_inquire_bucket < OT_BUCKET_COUNT ) {
246 ot_vector *torrents_list = mutex_bucket_lock( g_inquire_bucket );
247 unsigned int j;
248 for( j=0; j<torrents_list->size; ++j ) {
249 ot_torrent *torrent = (ot_torrent*)(torrents_list->data) + j;
250 memcpy(g_scrapebuffer_pos, torrent->hash, sizeof(ot_hash));
251 g_scrapebuffer_pos += sizeof(ot_hash);
252 uint32_pack_big( (char*)g_scrapebuffer_pos , (uint32_t)(g_now_minutes - torrent->peer_list->base ));
253 uint32_pack_big( (char*)g_scrapebuffer_pos + 4, (uint32_t)((uint64_t)(torrent->peer_list->down_count)>>32) );
254 uint32_pack_big( (char*)g_scrapebuffer_pos + 8, (uint32_t)torrent->peer_list->down_count );
255 g_scrapebuffer_pos += 12;
256
257 if( g_scrapebuffer_pos >= g_scrapebuffer_highwater ) {
258 socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT);
259 g_scrapebuffer_pos = g_scrapebuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t);
260 --packets_to_send;
261 }
262 }
263 mutex_bucket_unlock( g_inquire_bucket++, 0 );
264 if( !g_opentracker_running )
265 return;
266 }
267 if( g_inquire_bucket == OT_BUCKET_COUNT ) {
268 socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT);
269 g_inquire_inprogress = 0;
270 }
271}
272
273void livesync_handle_tell( ssize_t datalen ) {
274 int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
275
276 /* Some instance is in progress of telling. Our inquiry was successful.
277 Don't ask again until we see next beacon. */
278 g_next_inquire_time = 0;
279
280 /* Don't cause any new inquiries during another tracker's tell */
281 if( g_next_beacon_time - g_now_seconds < LIVESYNC_BEACON_INTERVAL )
282 g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL;
283
284 while( off + sizeof(ot_hash) + 12 <= (size_t)datalen ) {
285 ot_hash *hash = (ot_hash*)(g_inbuffer+off);
286 ot_vector *torrents_list = mutex_bucket_lock_by_hash(*hash);
287 size_t down_count_remote;
288 int exactmatch;
289 ot_torrent *torrent = vector_find_or_insert(torrents_list, hash, sizeof(ot_hash), OT_HASH_COMPARE_SIZE, &exactmatch);
290
291 if( !torrent ) {
292 mutex_bucket_unlock_by_hash( *hash, 0 );
293 continue;
294 }
295
296 if( !exactmatch ) {
297 /* Create a new torrent entry, then */
298 memcpy( &torrent->hash, hash, sizeof(ot_hash));
299
300 if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) {
301 vector_remove_torrent( torrents_list, torrent );
302 mutex_bucket_unlock_by_hash( *hash, 0 );
303 continue;
304 }
305
306 byte_zero( torrent->peer_list, sizeof( ot_peerlist ) );
307 torrent->peer_list->base = g_now_minutes - uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash));
308 }
309
310 down_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash ) + sizeof(uint32_t))) << 32);
311 down_count_remote |= (size_t) uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash ) + 2 * sizeof(uint32_t));
312
313 if( down_count_remote > torrent->peer_list->down_count )
314 torrent->peer_list->down_count = down_count_remote;
315 /* else
316 We might think of sending a tell packet, if we have a much larger downloaded count
317 */
318
319 mutex_bucket_unlock( g_inquire_bucket++, exactmatch?0:1 );
320 if( !g_opentracker_running )
321 return;
322 off += sizeof(ot_hash) + 12;
323 }
324}
325#endif /* WANT_SYNC_SCRAPE */
326
327/* Tickle the live sync module from time to time, so no events get 138/* Tickle the live sync module from time to time, so no events get
328 stuck when there's not enough traffic to fill udp packets fast 139 stuck when there's not enough traffic to fill udp packets fast
329 enough */ 140 enough */
330void livesync_ticker( ) { 141void livesync_ticker( ) {
331
332 /* livesync_issue_peersync sets g_next_packet_time */ 142 /* livesync_issue_peersync sets g_next_packet_time */
333 if( g_now_seconds > g_next_packet_time && 143 if( g_now_seconds > g_next_packet_time &&
334 g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) 144 g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
335 livesync_issue_peersync(); 145 livesync_issue_peersync();
336
337#ifdef WANT_SYNC_SCRAPE
338 /* Send first beacon after running at least LIVESYNC_FIRST_BEACON_DELAY
339 seconds and not more often than every LIVESYNC_BEACON_INTERVAL seconds */
340 if( g_now_seconds > g_next_beacon_time ) {
341 livesync_issue_beacon( );
342 g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL;
343 }
344
345 /* If we're interested in an inquiry and waited long enough to see all
346 tracker's beacons, go ahead and inquire */
347 if( g_next_inquire_time && g_now_seconds > g_next_inquire_time ) {
348 livesync_issue_inquire();
349
350 /* If packet gets lost, ask again after LIVESYNC_BEACON_INTERVAL */
351 g_next_inquire_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL;
352 }
353
354 /* If we're in process of telling, let's tell. */
355 if( g_inquire_inprogress )
356 livesync_issue_tell( );
357
358#endif /* WANT_SYNC_SCRAPE */
359} 146}
360 147
361/* Inform live sync about whats going on. */ 148/* Inform live sync about whats going on. */
362void livesync_tell( ot_hash const info_hash, const ot_peer * const peer ) { 149void livesync_tell( struct ot_workstruct *ws ) {
363 150
364 memcpy( g_peerbuffer_pos, info_hash, sizeof(ot_hash) ); 151 memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) );
365 memcpy( g_peerbuffer_pos+sizeof(ot_hash), peer, sizeof(ot_peer) ); 152 memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) );
366 153
367 g_peerbuffer_pos += sizeof(ot_hash)+sizeof(ot_peer); 154 g_outbuf_data += sizeof(ot_hash) + sizeof(ot_peer);
368 155
369 if( g_peerbuffer_pos >= g_peerbuffer_highwater ) 156 if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS )
370 livesync_issue_peersync(); 157 livesync_issue_peersync();
371} 158}
372 159
373static void * livesync_worker( void * args ) { 160static void * livesync_worker( void * args ) {
161 struct ot_workstruct ws;
374 ot_ip6 in_ip; uint16_t in_port; 162 ot_ip6 in_ip; uint16_t in_port;
375 ssize_t datalen;
376 163
377 (void)args; 164 (void)args;
378 165
166 /* Initialize our "thread local storage" */
167 ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE );
168 ws.outbuf = ws.reply = 0;
169
379 memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); 170 memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) );
380 171
381 while( 1 ) { 172 while( 1 ) {
382 datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); 173 ws.request_size = socket_recv4(g_socket_in, (char*)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port);
383 174
384 /* Expect at least tracker id and packet type */ 175 /* Expect at least tracker id and packet type */
385 if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) 176 if( ws.request_size <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) )
386 continue; 177 continue;
387 if( !accesslist_isblessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) 178 if( !accesslist_isblessed(in_ip, OT_PERMISSION_MAY_LIVESYNC))
388 continue; 179 continue;
389 if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { 180 if( !memcmp( ws.inbuf, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
390 /* TODO: log packet coming from ourselves */ 181 /* TODO: log packet coming from ourselves */
391 continue; 182 continue;
392 } 183 }
393 184
394 switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { 185 switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) {
395 case OT_SYNC_PEER: 186 case OT_SYNC_PEER:
396 livesync_handle_peersync( datalen ); 187 livesync_handle_peersync( &ws );
397 break;
398#ifdef WANT_SYNC_SCRAPE
399 case OT_SYNC_SCRAPE_BEACON:
400 livesync_handle_beacon( datalen );
401 break;
402 case OT_SYNC_SCRAPE_INQUIRE:
403 livesync_handle_inquire( datalen );
404 break;
405 case OT_SYNC_SCRAPE_TELL:
406 livesync_handle_tell( datalen );
407 break; 188 break;
408#endif /* WANT_SYNC_SCRAPE */
409 default: 189 default:
410 break; 190 break;
411 } 191 }
diff --git a/ot_livesync.h b/ot_livesync.h
index fe9d122..1a3ed45 100644
--- a/ot_livesync.h
+++ b/ot_livesync.h
@@ -35,44 +35,6 @@
35 0x0024 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 ) 35 0x0024 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 )
36 ]* 36 ]*
37 37
38 ########
39 ######## SCRAPE SYNC PROTOCOL ########
40 ########
41
42 Each tracker instance SHOULD broadcast a beacon every LIVESYNC_BEACON_INTERVAL
43 seconds after running at least LIVESYNC_FIRST_BEACON_DELAY seconds:
44
45 packet type SYNC_SCRAPE_BEACON
46 [ 0x0008 0x08 amount of torrents served
47 ]
48
49 If a tracker instance receives a beacon from another instance that has more than
50 its torrent count plus a threshold, it inquires for a scrape. It must wait for at
51 least 2 * LIVESYNC_BEACON_INTERVAL seconds in order to inspect beacons from all
52 tracker instances and inquire only the one with most torrents.
53
54 If it sees a SYNC_SCRAPE_TELL within that time frame, it's likely, that another
55 scrape sync is going on. It should reset its state to needs no inquiry. It should
56 be reenabled on the next beacon, if still needed.
57
58 packet type SYNC_SCRAPE_INQUIRE
59 [ 0x0008 0x04 id of tracker instance to inquire
60 ]
61
62 The inquired tracker instance answers with as many scrape tell packets it needs
63 to deliver stats about all its torrents
64
65 packet type SYNC_SCRAPE_TELL
66 [ 0x0008 0x14 info_hash
67 0x001c 0x04 base offset (i.e. when was it last announced, in minutes)
68 0x0020 0x08 downloaded count
69 ]*
70
71 Each tracker instance that receives a SYNC_SCRAPE_TELL, looks up each torrent and
72 compares downloaded count with its own counter. It can send out its own scrape
73 tell packets, if it knows more. However to not interrupt a scrape tell, a tracker
74 should wait LIVESYNC_BEACON_INTERVAL after receiving a scrape tell.
75
76*/ 38*/
77 39
78#ifdef WANT_SYNC_LIVE 40#ifdef WANT_SYNC_LIVE
@@ -86,7 +48,7 @@ void livesync_deinit();
86void livesync_bind_mcast( char *ip, uint16_t port ); 48void livesync_bind_mcast( char *ip, uint16_t port );
87 49
88/* Inform live sync about whats going on. */ 50/* Inform live sync about whats going on. */
89void livesync_tell( ot_hash const info_hash, const ot_peer * const peer ); 51void livesync_tell( struct ot_workstruct *ws );
90 52
91/* Tickle the live sync module from time to time, so no events get 53/* Tickle the live sync module from time to time, so no events get
92 stuck when there's not enough traffic to fill udp packets fast 54 stuck when there's not enough traffic to fill udp packets fast
diff --git a/ot_stats.c b/ot_stats.c
index 43ab8fd..b6469f9 100644
--- a/ot_stats.c
+++ b/ot_stats.c
@@ -642,8 +642,9 @@ void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event
642 case EVENT_COMPLETED: 642 case EVENT_COMPLETED:
643#ifdef WANT_SYSLOGS 643#ifdef WANT_SYSLOGS
644 if( event_data) { 644 if( event_data) {
645 struct ot_workstruct *ws = (struct ot_workstruct *)event_data;
645 char timestring[64]; 646 char timestring[64];
646 char hex_out[42]; 647 char hash_hex[42], peerid_hex[42], ip_readable[64];
647 struct tm time_now; 648 struct tm time_now;
648 time_t ttt; 649 time_t ttt;
649 650
@@ -651,8 +652,19 @@ void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event
651 localtime_r( &ttt, &time_now ); 652 localtime_r( &ttt, &time_now );
652 strftime( timestring, sizeof( timestring ), "%FT%T%z", &time_now ); 653 strftime( timestring, sizeof( timestring ), "%FT%T%z", &time_now );
653 654
654 to_hex( hex_out, (uint8_t*)event_data ); 655 to_hex( hash_hex, *ws->hash );
655 syslog( LOG_INFO, "time=%s event=completed info_hash=%s", timestring, hex_out ); 656 if( ws->peer_id )
657 to_hex( peerid_hex, (uint8_t*)ws->peer_id );
658 else {
659 *peerid_hex=0;
660 }
661
662#ifdef WANT_V6
663 ip_readable[ fmt_ip6c( ip_readable, (char*)&ws->peer ) ] = 0;
664#else
665 ip_readable[ fmt_ip4( ip_readable, (char*)&ws->peer ) ] = 0;
666#endif
667 syslog( LOG_INFO, "time=%s event=completed info_hash=%s peer_id=%s ip=%s", timestring, hash_hex, peerid_hex, ip_readable );
656 } 668 }
657#endif 669#endif
658 ot_overall_completed++; 670 ot_overall_completed++;
diff --git a/ot_udp.c b/ot_udp.c
index a95a4fa..bc2d6cc 100644
--- a/ot_udp.c
+++ b/ot_udp.c
@@ -29,8 +29,6 @@ static void udp_make_connectionid( uint32_t * connid, const ot_ip6 remoteip ) {
29 29
30/* UDP implementation according to http://xbtt.sourceforge.net/udp_tracker_protocol.html */ 30/* UDP implementation according to http://xbtt.sourceforge.net/udp_tracker_protocol.html */
31void handle_udp6( int64 serversocket, struct ot_workstruct *ws ) { 31void handle_udp6( int64 serversocket, struct ot_workstruct *ws ) {
32 ot_peer peer;
33 ot_hash *hash = NULL;
34 ot_ip6 remoteip; 32 ot_ip6 remoteip;
35 uint32_t *inpacket = (uint32_t*)ws->inbuf; 33 uint32_t *inpacket = (uint32_t*)ws->inbuf;
36 uint32_t *outpacket = (uint32_t*)ws->outbuf; 34 uint32_t *outpacket = (uint32_t*)ws->outbuf;
@@ -43,6 +41,10 @@ void handle_udp6( int64 serversocket, struct ot_workstruct *ws ) {
43 stats_issue_event( EVENT_ACCEPT, FLAG_UDP, (uintptr_t)remoteip ); 41 stats_issue_event( EVENT_ACCEPT, FLAG_UDP, (uintptr_t)remoteip );
44 stats_issue_event( EVENT_READ, FLAG_UDP, byte_count ); 42 stats_issue_event( EVENT_READ, FLAG_UDP, byte_count );
45 43
44 /* Initialise hash pointer */
45 ws->hash = NULL;
46 ws->peer_id = NULL;
47
46 /* Minimum udp tracker packet size, also catches error */ 48 /* Minimum udp tracker packet size, also catches error */
47 if( byte_count < 16 ) 49 if( byte_count < 16 )
48 return; 50 return;
@@ -71,33 +73,36 @@ void handle_udp6( int64 serversocket, struct ot_workstruct *ws ) {
71 numwant = ntohl( inpacket[92/4] ); 73 numwant = ntohl( inpacket[92/4] );
72 if (numwant > 200) numwant = 200; 74 if (numwant > 200) numwant = 200;
73 75
74 event = ntohl( inpacket[80/4] ); 76 event = ntohl( inpacket[80/4] );
75 port = *(uint16_t*)( ((char*)inpacket) + 96 ); 77 port = *(uint16_t*)( ((char*)inpacket) + 96 );
76 hash = (ot_hash*)( ((char*)inpacket) + 16 ); 78 ws->hash = (ot_hash*)( ((char*)inpacket) + 16 );
77 79
78 OT_SETIP( &peer, remoteip ); 80 OT_SETIP( &ws->peer, remoteip );
79 OT_SETPORT( &peer, &port ); 81 OT_SETPORT( &ws->peer, &port );
80 OT_PEERFLAG( &peer ) = 0; 82 OT_PEERFLAG( &ws->peer ) = 0;
81 83
82 switch( event ) { 84 switch( event ) {
83 case 1: OT_PEERFLAG( &peer ) |= PEER_FLAG_COMPLETED; break; 85 case 1: OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_COMPLETED; break;
84 case 3: OT_PEERFLAG( &peer ) |= PEER_FLAG_STOPPED; break; 86 case 3: OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_STOPPED; break;
85 default: break; 87 default: break;
86 } 88 }
87 89
88 if( !left ) 90 if( !left )
89 OT_PEERFLAG( &peer ) |= PEER_FLAG_SEEDING; 91 OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_SEEDING;
90 92
91 outpacket[0] = htonl( 1 ); /* announce action */ 93 outpacket[0] = htonl( 1 ); /* announce action */
92 outpacket[1] = inpacket[12/4]; 94 outpacket[1] = inpacket[12/4];
93 95
94 if( OT_PEERFLAG( &peer ) & PEER_FLAG_STOPPED ) /* Peer is gone. */ 96 if( OT_PEERFLAG( &ws->peer ) & PEER_FLAG_STOPPED ) { /* Peer is gone. */
95 byte_count = remove_peer_from_torrent( *hash, &peer, ws->outbuf, FLAG_UDP ); 97 ws->reply = ws->outbuf;
96 else 98 ws->reply_size = remove_peer_from_torrent( FLAG_UDP, ws );
97 byte_count = 8 + add_peer_to_torrent_and_return_peers( *hash, &peer, FLAG_UDP, numwant, ((char*)outpacket) + 8 ); 99 } else {
100 ws->reply = ws->outbuf + 8;
101 ws->reply_size = 8 + add_peer_to_torrent_and_return_peers( FLAG_UDP, ws, numwant );
102 }
98 103
99 socket_send6( serversocket, ws->outbuf, byte_count, remoteip, remoteport, 0 ); 104 socket_send6( serversocket, ws->outbuf, ws->reply_size, remoteip, remoteport, 0 );
100 stats_issue_event( EVENT_ANNOUNCE, FLAG_UDP, byte_count ); 105 stats_issue_event( EVENT_ANNOUNCE, FLAG_UDP, ws->reply_size );
101 break; 106 break;
102 107
103 case 2: /* This is a scrape action */ 108 case 2: /* This is a scrape action */
diff --git a/trackerlogic.c b/trackerlogic.c
index 5348927..7ae9bb1 100644
--- a/trackerlogic.c
+++ b/trackerlogic.c
@@ -71,36 +71,35 @@ void add_torrent_from_saved_state( ot_hash hash, ot_time base, size_t down_count
71 return mutex_bucket_unlock_by_hash( hash, 1 ); 71 return mutex_bucket_unlock_by_hash( hash, 1 );
72} 72}
73 73
74size_t add_peer_to_torrent_and_return_peers( ot_hash hash, ot_peer *peer, PROTO_FLAG proto, size_t amount, char * reply ) { 74size_t add_peer_to_torrent_and_return_peers( PROTO_FLAG proto, struct ot_workstruct *ws, size_t amount ) {
75 int exactmatch, delta_torrentcount = 0; 75 int exactmatch, delta_torrentcount = 0;
76 size_t reply_size;
77 ot_torrent *torrent; 76 ot_torrent *torrent;
78 ot_peer *peer_dest; 77 ot_peer *peer_dest;
79 ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); 78 ot_vector *torrents_list = mutex_bucket_lock_by_hash( *ws->hash );
80 79
81 if( !accesslist_hashisvalid( hash ) ) { 80 if( !accesslist_hashisvalid( *ws->hash ) ) {
82 mutex_bucket_unlock_by_hash( hash, 0 ); 81 mutex_bucket_unlock_by_hash( *ws->hash, 0 );
83 if( proto == FLAG_TCP ) { 82 if( proto == FLAG_TCP ) {
84 const char invalid_hash[] = "d14:failure reason63:Requested download is not authorized for use with this tracker.e"; 83 const char invalid_hash[] = "d14:failure reason63:Requested download is not authorized for use with this tracker.e";
85 memcpy( reply, invalid_hash, strlen( invalid_hash ) ); 84 memcpy( ws->reply, invalid_hash, strlen( invalid_hash ) );
86 return strlen( invalid_hash ); 85 return strlen( invalid_hash );
87 } 86 }
88 return 0; 87 return 0;
89 } 88 }
90 89
91 torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); 90 torrent = vector_find_or_insert( torrents_list, (void*)ws->hash, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
92 if( !torrent ) { 91 if( !torrent ) {
93 mutex_bucket_unlock_by_hash( hash, 0 ); 92 mutex_bucket_unlock_by_hash( *ws->hash, 0 );
94 return 0; 93 return 0;
95 } 94 }
96 95
97 if( !exactmatch ) { 96 if( !exactmatch ) {
98 /* Create a new torrent entry, then */ 97 /* Create a new torrent entry, then */
99 memcpy( torrent->hash, hash, sizeof(ot_hash) ); 98 memcpy( torrent->hash, *ws->hash, sizeof(ot_hash) );
100 99
101 if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { 100 if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) {
102 vector_remove_torrent( torrents_list, torrent ); 101 vector_remove_torrent( torrents_list, torrent );
103 mutex_bucket_unlock_by_hash( hash, 0 ); 102 mutex_bucket_unlock_by_hash( *ws->hash, 0 );
104 return 0; 103 return 0;
105 } 104 }
106 105
@@ -112,76 +111,76 @@ size_t add_peer_to_torrent_and_return_peers( ot_hash hash, ot_peer *peer, PROTO_
112 torrent->peer_list->base = g_now_minutes; 111 torrent->peer_list->base = g_now_minutes;
113 112
114 /* Check for peer in torrent */ 113 /* Check for peer in torrent */
115 peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); 114 peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), &ws->peer, &exactmatch );
116 if( !peer_dest ) { 115 if( !peer_dest ) {
117 mutex_bucket_unlock_by_hash( hash, delta_torrentcount ); 116 mutex_bucket_unlock_by_hash( *ws->hash, delta_torrentcount );
118 return 0; 117 return 0;
119 } 118 }
120 119
121 /* Tell peer that it's fresh */ 120 /* Tell peer that it's fresh */
122 OT_PEERTIME( peer ) = 0; 121 OT_PEERTIME( &ws->peer ) = 0;
123 122
124 /* Sanitize flags: Whoever claims to have completed download, must be a seeder */ 123 /* Sanitize flags: Whoever claims to have completed download, must be a seeder */
125 if( ( OT_PEERFLAG( peer ) & ( PEER_FLAG_COMPLETED | PEER_FLAG_SEEDING ) ) == PEER_FLAG_COMPLETED ) 124 if( ( OT_PEERFLAG( &ws->peer ) & ( PEER_FLAG_COMPLETED | PEER_FLAG_SEEDING ) ) == PEER_FLAG_COMPLETED )
126 OT_PEERFLAG( peer ) ^= PEER_FLAG_COMPLETED; 125 OT_PEERFLAG( &ws->peer ) ^= PEER_FLAG_COMPLETED;
127 126
128 /* If we hadn't had a match create peer there */ 127 /* If we hadn't had a match create peer there */
129 if( !exactmatch ) { 128 if( !exactmatch ) {
130 129
131#ifdef WANT_SYNC_LIVE 130#ifdef WANT_SYNC_LIVE
132 if( proto == FLAG_MCA ) 131 if( proto == FLAG_MCA )
133 OT_PEERFLAG( peer ) |= PEER_FLAG_FROM_SYNC; 132 OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_FROM_SYNC;
134 else 133 else
135 livesync_tell( hash, peer ); 134 livesync_tell( ws );
136#endif 135#endif
137 136
138 torrent->peer_list->peer_count++; 137 torrent->peer_list->peer_count++;
139 if( OT_PEERFLAG(peer) & PEER_FLAG_COMPLETED ) { 138 if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_COMPLETED ) {
140 torrent->peer_list->down_count++; 139 torrent->peer_list->down_count++;
141 stats_issue_event( EVENT_COMPLETED, 0, (uintptr_t)torrent->hash ); 140 stats_issue_event( EVENT_COMPLETED, 0, (uintptr_t)ws );
142 } 141 }
143 if( OT_PEERFLAG(peer) & PEER_FLAG_SEEDING ) 142 if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_SEEDING )
144 torrent->peer_list->seed_count++; 143 torrent->peer_list->seed_count++;
145 144
146 } else { 145 } else {
147 stats_issue_event( EVENT_RENEW, 0, OT_PEERTIME( peer_dest ) ); 146 stats_issue_event( EVENT_RENEW, 0, OT_PEERTIME( peer_dest ) );
148#ifdef WANT_SPOT_WOODPECKER 147#ifdef WANT_SPOT_WOODPECKER
149 if( ( OT_PEERTIME(peer_dest) > 0 ) && ( OT_PEERTIME(peer_dest) < 20 ) ) 148 if( ( OT_PEERTIME(peer_dest) > 0 ) && ( OT_PEERTIME(peer_dest) < 20 ) )
150 stats_issue_event( EVENT_WOODPECKER, 0, (uintptr_t)peer ); 149 stats_issue_event( EVENT_WOODPECKER, 0, (uintptr_t)&ws->peer );
151#endif 150#endif
152#ifdef WANT_SYNC_LIVE 151#ifdef WANT_SYNC_LIVE
153 /* Won't live sync peers that come back too fast. Only exception: 152 /* Won't live sync peers that come back too fast. Only exception:
154 fresh "completed" reports */ 153 fresh "completed" reports */
155 if( proto != FLAG_MCA ) { 154 if( proto != FLAG_MCA ) {
156 if( OT_PEERTIME( peer_dest ) > OT_CLIENT_SYNC_RENEW_BOUNDARY || 155 if( OT_PEERTIME( peer_dest ) > OT_CLIENT_SYNC_RENEW_BOUNDARY ||
157 ( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED ) && (OT_PEERFLAG(peer) & PEER_FLAG_COMPLETED ) ) ) 156 ( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED ) && (OT_PEERFLAG(&ws->peer) & PEER_FLAG_COMPLETED ) ) )
158 livesync_tell( hash, peer ); 157 livesync_tell( ws );
159 } 158 }
160#endif 159#endif
161 160
162 if( (OT_PEERFLAG(peer_dest) & PEER_FLAG_SEEDING ) && !(OT_PEERFLAG(peer) & PEER_FLAG_SEEDING ) ) 161 if( (OT_PEERFLAG(peer_dest) & PEER_FLAG_SEEDING ) && !(OT_PEERFLAG(&ws->peer) & PEER_FLAG_SEEDING ) )
163 torrent->peer_list->seed_count--; 162 torrent->peer_list->seed_count--;
164 if( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_SEEDING ) && (OT_PEERFLAG(peer) & PEER_FLAG_SEEDING ) ) 163 if( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_SEEDING ) && (OT_PEERFLAG(&ws->peer) & PEER_FLAG_SEEDING ) )
165 torrent->peer_list->seed_count++; 164 torrent->peer_list->seed_count++;
166 if( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED ) && (OT_PEERFLAG(peer) & PEER_FLAG_COMPLETED ) ) { 165 if( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED ) && (OT_PEERFLAG(&ws->peer) & PEER_FLAG_COMPLETED ) ) {
167 torrent->peer_list->down_count++; 166 torrent->peer_list->down_count++;
168 stats_issue_event( EVENT_COMPLETED, 0, (uintptr_t)torrent->hash ); 167 stats_issue_event( EVENT_COMPLETED, 0, (uintptr_t)ws );
169 } 168 }
170 if( OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED ) 169 if( OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED )
171 OT_PEERFLAG( peer ) |= PEER_FLAG_COMPLETED; 170 OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_COMPLETED;
172 } 171 }
173 172
174 memcpy( peer_dest, peer, sizeof(ot_peer) ); 173 memcpy( peer_dest, &ws->peer, sizeof(ot_peer) );
175#ifdef WANT_SYNC 174#ifdef WANT_SYNC
176 if( proto == FLAG_MCA ) { 175 if( proto == FLAG_MCA ) {
177 mutex_bucket_unlock_by_hash( hash, delta_torrentcount ); 176 mutex_bucket_unlock_by_hash( *ws->hash, delta_torrentcount );
178 return 0; 177 return 0;
179 } 178 }
180#endif 179#endif
181 180
182 reply_size = return_peers_for_torrent( torrent, amount, reply, proto ); 181 ws->reply_size = return_peers_for_torrent( torrent, amount, ws->reply, proto );
183 mutex_bucket_unlock_by_hash( torrent->hash, delta_torrentcount ); 182 mutex_bucket_unlock_by_hash( *ws->hash, delta_torrentcount );
184 return reply_size; 183 return ws->reply_size;
185} 184}
186 185
187static size_t return_peers_all( ot_peerlist *peer_list, char *reply ) { 186static size_t return_peers_all( ot_peerlist *peer_list, char *reply ) {
@@ -350,23 +349,22 @@ size_t return_tcp_scrape_for_torrent( ot_hash *hash_list, int amount, char *repl
350} 349}
351 350
352static ot_peerlist dummy_list; 351static ot_peerlist dummy_list;
353size_t remove_peer_from_torrent( ot_hash hash, ot_peer *peer, char *reply, PROTO_FLAG proto ) { 352size_t remove_peer_from_torrent( PROTO_FLAG proto, struct ot_workstruct *ws ) {
354 int exactmatch; 353 int exactmatch;
355 size_t reply_size = 0; 354 ot_vector *torrents_list = mutex_bucket_lock_by_hash( *ws->hash );
356 ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); 355 ot_torrent *torrent = binary_search( ws->hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
357 ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
358 ot_peerlist *peer_list = &dummy_list; 356 ot_peerlist *peer_list = &dummy_list;
359 357
360#ifdef WANT_SYNC_LIVE 358#ifdef WANT_SYNC_LIVE
361 if( proto != FLAG_MCA ) { 359 if( proto != FLAG_MCA ) {
362 OT_PEERFLAG( peer ) |= PEER_FLAG_STOPPED; 360 OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_STOPPED;
363 livesync_tell( hash, peer ); 361 livesync_tell( ws );
364 } 362 }
365#endif 363#endif
366 364
367 if( exactmatch ) { 365 if( exactmatch ) {
368 peer_list = torrent->peer_list; 366 peer_list = torrent->peer_list;
369 switch( vector_remove_peer( &peer_list->peers, peer ) ) { 367 switch( vector_remove_peer( &peer_list->peers, &ws->peer ) ) {
370 case 2: peer_list->seed_count--; /* Fall throughs intended */ 368 case 2: peer_list->seed_count--; /* Fall throughs intended */
371 case 1: peer_list->peer_count--; /* Fall throughs intended */ 369 case 1: peer_list->peer_count--; /* Fall throughs intended */
372 default: break; 370 default: break;
@@ -375,19 +373,19 @@ size_t remove_peer_from_torrent( ot_hash hash, ot_peer *peer, char *reply, PROTO
375 373
376 if( proto == FLAG_TCP ) { 374 if( proto == FLAG_TCP ) {
377 int erval = OT_CLIENT_REQUEST_INTERVAL_RANDOM; 375 int erval = OT_CLIENT_REQUEST_INTERVAL_RANDOM;
378 reply_size = sprintf( reply, "d8:completei%zde10:incompletei%zde8:intervali%ie12:min intervali%ie" PEERS_BENCODED "0:e", peer_list->seed_count, peer_list->peer_count - peer_list->seed_count, erval, erval / 2 ); 376 ws->reply_size = sprintf( ws->reply, "d8:completei%zde10:incompletei%zde8:intervali%ie12:min intervali%ie" PEERS_BENCODED "0:e", peer_list->seed_count, peer_list->peer_count - peer_list->seed_count, erval, erval / 2 );
379 } 377 }
380 378
381 /* Handle UDP reply */ 379 /* Handle UDP reply */
382 if( proto == FLAG_UDP ) { 380 if( proto == FLAG_UDP ) {
383 ((uint32_t*)reply)[2] = htonl( OT_CLIENT_REQUEST_INTERVAL_RANDOM ); 381 ((uint32_t*)ws->reply)[2] = htonl( OT_CLIENT_REQUEST_INTERVAL_RANDOM );
384 ((uint32_t*)reply)[3] = htonl( peer_list->peer_count - peer_list->seed_count ); 382 ((uint32_t*)ws->reply)[3] = htonl( peer_list->peer_count - peer_list->seed_count );
385 ((uint32_t*)reply)[4] = htonl( peer_list->seed_count); 383 ((uint32_t*)ws->reply)[4] = htonl( peer_list->seed_count);
386 reply_size = 20; 384 ws->reply_size = 20;
387 } 385 }
388 386
389 mutex_bucket_unlock_by_hash( hash, 0 ); 387 mutex_bucket_unlock_by_hash( *ws->hash, 0 );
390 return reply_size; 388 return ws->reply_size;
391} 389}
392 390
393void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data ) { 391void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data ) {
diff --git a/trackerlogic.h b/trackerlogic.h
index 4052fa4..5ae644b 100644
--- a/trackerlogic.h
+++ b/trackerlogic.h
@@ -108,22 +108,29 @@ struct ot_peerlist {
108 108
109struct ot_workstruct { 109struct ot_workstruct {
110 /* Thread specific, static */ 110 /* Thread specific, static */
111 char *inbuf; 111 char *inbuf;
112#define G_INBUF_SIZE 8192 112#define G_INBUF_SIZE 8192
113 char *outbuf; 113 char *outbuf;
114#define G_OUTBUF_SIZE 8192 114#define G_OUTBUF_SIZE 8192
115#ifdef _DEBUG_HTTPERROR 115#ifdef _DEBUG_HTTPERROR
116 char *debugbuf; 116 char *debugbuf;
117#define G_DEBUGBUF_SIZE 8192 117#define G_DEBUGBUF_SIZE 8192
118#endif 118#endif
119 119
120 /* The peer currently in the working */
121 ot_peer peer;
122
123 /* Pointers into the request buffer */
124 ot_hash *hash;
125 char *peer_id;
126
120 /* HTTP specific, non static */ 127 /* HTTP specific, non static */
121 int keep_alive; 128 int keep_alive;
122 char *request; 129 char *request;
123 ssize_t request_size; 130 ssize_t request_size;
124 ssize_t header_size; 131 ssize_t header_size;
125 char *reply; 132 char *reply;
126 ssize_t reply_size; 133 ssize_t reply_size;
127}; 134};
128 135
129/* 136/*
@@ -150,9 +157,8 @@ void exerr( char * message );
150 157
151/* add_peer_to_torrent does only release the torrent bucket if from_sync is set, 158/* add_peer_to_torrent does only release the torrent bucket if from_sync is set,
152 otherwise it is released in return_peers_for_torrent */ 159 otherwise it is released in return_peers_for_torrent */
153#define add_peer_to_torrent(hash,peer,proto) add_peer_to_torrent_and_return_peers(hash,peer,proto,0,NULL) 160size_t add_peer_to_torrent_and_return_peers( PROTO_FLAG proto, struct ot_workstruct *ws, size_t amount );
154size_t add_peer_to_torrent_and_return_peers( ot_hash hash, ot_peer *peer, PROTO_FLAG proto, size_t amount, char * reply ); 161size_t remove_peer_from_torrent( PROTO_FLAG proto, struct ot_workstruct *ws );
155size_t remove_peer_from_torrent( ot_hash hash, ot_peer *peer, char *reply, PROTO_FLAG proto );
156size_t return_tcp_scrape_for_torrent( ot_hash *hash, int amount, char *reply ); 162size_t return_tcp_scrape_for_torrent( ot_hash *hash, int amount, char *reply );
157size_t return_udp_scrape_for_torrent( ot_hash hash, char *reply ); 163size_t return_udp_scrape_for_torrent( ot_hash hash, char *reply );
158void add_torrent_from_saved_state( ot_hash hash, ot_time base, size_t down_count ); 164void add_torrent_from_saved_state( ot_hash hash, ot_time base, size_t down_count );