summaryrefslogtreecommitdiff
path: root/ot_livesync.c
diff options
context:
space:
mode:
authorerdgeist <>2009-01-02 08:57:53 +0000
committererdgeist <>2009-01-02 08:57:53 +0000
commit2df09905f5540fee096d48a92cb0c42558498a12 (patch)
tree68eab61d29719400972485de395dd0465467aea6 /ot_livesync.c
parent548e2b8338b5ee8d24fa928e833f345bb5cb6f0e (diff)
* opentracker now drops permissions in correct order and really chroots() if ran as root
* lock passing between add_peer_to_torrent and return_peers_for_torrent is now avoided by providing a more general add_peer_to_torrent_and_return_peers function that can be used with NULL parameters to not return any peers (in sync case) * in order to keep a fast overview how many torrents opentracker maintains, every mutex_bucket_unlock operation expects an additional integer parameter that tells ot_mutex.c how many torrents have been added or removed. A function mutex_get_torrent_count has been introduced.
Diffstat (limited to 'ot_livesync.c')
-rw-r--r--ot_livesync.c400
1 files changed, 319 insertions, 81 deletions
diff --git a/ot_livesync.c b/ot_livesync.c
index 3cad121..47a371a 100644
--- a/ot_livesync.c
+++ b/ot_livesync.c
@@ -9,59 +9,109 @@
9#include <string.h> 9#include <string.h>
10#include <pthread.h> 10#include <pthread.h>
11#include <unistd.h> 11#include <unistd.h>
12#include <stdlib.h>
12 13
13/* Libowfat */ 14/* Libowfat */
14#include "socket.h" 15#include "socket.h"
15#include "ndelay.h" 16#include "ndelay.h"
17#include "byte.h"
16 18
17/* Opentracker */ 19/* Opentracker */
18#include "trackerlogic.h" 20#include "trackerlogic.h"
19#include "ot_livesync.h" 21#include "ot_livesync.h"
20#include "ot_accesslist.h" 22#include "ot_accesslist.h"
21#include "ot_stats.h" 23#include "ot_stats.h"
24#include "ot_mutex.h"
22 25
23#ifdef WANT_SYNC_LIVE 26#ifdef WANT_SYNC_LIVE
24 27
25char groupip_1[4] = { 224,0,23,42 }; 28char groupip_1[4] = { 224,0,23,5 };
26 29
27#define LIVESYNC_BUFFINSIZE (256*256) 30#define LIVESYNC_INCOMING_BUFFSIZE (256*256)
28#define LIVESYNC_BUFFSIZE 1504
29#define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash))
30 31
31#define LIVESYNC_MAXDELAY 15 32#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1504
33#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
34
35#ifdef WANT_SYNC_SCRAPE
36#define LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE 1504
37#define LIVESYNC_OUTGOING_WATERMARK_SCRAPE (sizeof(ot_hash)+sizeof(uint64_t)+sizeof(uint32_t))
38#define LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE 100
39
40#define LIVESYNC_FIRST_BEACON_DELAY (30*60) /* seconds */
41#define LIVESYNC_BEACON_INTERVAL 60 /* seconds */
42#define LIVESYNC_INQUIRE_THRESH 0.75
43#endif /* WANT_SYNC_SCRAPE */
44
45#define LIVESYNC_MAXDELAY 15 /* seconds */
46
47enum { OT_SYNC_PEER
48#ifdef WANT_SYNC_SCRAPE
49 , OT_SYNC_SCRAPE_BEACON, OT_SYNC_SCRAPE_INQUIRE, OT_SYNC_SCRAPE_TELL
50#endif
51};
32 52
33/* Forward declaration */ 53/* Forward declaration */
34static void * livesync_worker( void * args ); 54static void * livesync_worker( void * args );
35 55
36/* For outgoing packets */ 56/* For outgoing packets */
37static int64 g_livesync_socket_in = -1; 57static int64 g_socket_in = -1;
38 58
39/* For incoming packets */ 59/* For incoming packets */
40static int64 g_livesync_socket_out = -1; 60static int64 g_socket_out = -1;
61static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];
41 62
42static uint8_t livesync_inbuffer[LIVESYNC_BUFFINSIZE]; 63static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
43static uint8_t livesync_outbuffer_start[ LIVESYNC_BUFFSIZE ]; 64static uint8_t *g_peerbuffer_pos;
44static uint8_t *livesync_outbuffer_pos; 65static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
45static uint8_t *livesync_outbuffer_highwater = livesync_outbuffer_start + LIVESYNC_BUFFSIZE - LIVESYNC_BUFFWATER; 66
46static ot_time livesync_lastpacket_time; 67static ot_time g_next_packet_time;
68
69#ifdef WANT_SYNC_SCRAPE
70/* Live sync scrape buffers, states and timers */
71static ot_time g_next_beacon_time;
72static ot_time g_next_inquire_time;
73
74static uint8_t g_scrapebuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE];
75static uint8_t *g_scrapebuffer_pos;
76static uint8_t *g_scrapebuffer_highwater = g_scrapebuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE - LIVESYNC_OUTGOING_WATERMARK_SCRAPE;
77
78static size_t g_inquire_remote_count;
79static uint32_t g_inquire_remote_host;
80static int g_inquire_inprogress;
81static int g_inquire_bucket;
82#endif /* WANT_SYNC_SCRAPE */
47 83
48static pthread_t thread_id; 84static pthread_t thread_id;
49void livesync_init( ) { 85void livesync_init( ) {
50 if( g_livesync_socket_in == -1 ) 86 if( g_socket_in == -1 )
51 exerr( "No socket address for live sync specified." ); 87 exerr( "No socket address for live sync specified." );
52 livesync_outbuffer_pos = livesync_outbuffer_start; 88
53 memmove( livesync_outbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); 89 /* Prepare outgoing peers buffer */
54 livesync_outbuffer_pos += sizeof( g_tracker_id ); 90 g_peerbuffer_pos = g_peerbuffer_start;
55 livesync_lastpacket_time = g_now_seconds; 91 memmove( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
92 uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER);
93 g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
94
95#ifdef WANT_SYNC_SCRAPE
96 /* Prepare outgoing scrape buffer */
97 g_scrapebuffer_pos = g_scrapebuffer_start;
98 memmove( g_scrapebuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
99 uint32_pack_big( (char*)g_scrapebuffer_pos + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_TELL);
100 g_scrapebuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
101
102 /* Wind up timers for inquires */
103 g_next_beacon_time = g_now_seconds + LIVESYNC_FIRST_BEACON_DELAY;
104#endif /* WANT_SYNC_SCRAPE */
105 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
56 106
57 pthread_create( &thread_id, NULL, livesync_worker, NULL ); 107 pthread_create( &thread_id, NULL, livesync_worker, NULL );
58} 108}
59 109
60void livesync_deinit() { 110void livesync_deinit() {
61 if( g_livesync_socket_in != -1 ) 111 if( g_socket_in != -1 )
62 close( g_livesync_socket_in ); 112 close( g_socket_in );
63 if( g_livesync_socket_out != -1 ) 113 if( g_socket_out != -1 )
64 close( g_livesync_socket_out ); 114 close( g_socket_out );
65 115
66 pthread_cancel( thread_id ); 116 pthread_cancel( thread_id );
67} 117}
@@ -69,104 +119,292 @@ void livesync_deinit() {
69void livesync_bind_mcast( char *ip, uint16_t port) { 119void livesync_bind_mcast( char *ip, uint16_t port) {
70 char tmpip[4] = {0,0,0,0}; 120 char tmpip[4] = {0,0,0,0};
71 121
72 if( g_livesync_socket_in != -1 ) 122 if( g_socket_in != -1 )
73 exerr("Error: Livesync listen ip specified twice."); 123 exerr("Error: Livesync listen ip specified twice.");
74 124
75 if( ( g_livesync_socket_in = socket_udp4( )) < 0) 125 if( ( g_socket_in = socket_udp4( )) < 0)
76 exerr("Error: Cant create live sync incoming socket." ); 126 exerr("Error: Cant create live sync incoming socket." );
77 ndelay_off(g_livesync_socket_in); 127 ndelay_off(g_socket_in);
78 128
79 if( socket_bind4_reuse( g_livesync_socket_in, tmpip, port ) == -1 ) 129 if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 )
80 exerr("Error: Cant bind live sync incoming socket." ); 130 exerr("Error: Cant bind live sync incoming socket." );
81 131
82 if( socket_mcjoin4( g_livesync_socket_in, groupip_1, ip ) ) 132 if( socket_mcjoin4( g_socket_in, groupip_1, ip ) )
83 exerr("Error: Cant make live sync incoming socket join mcast group."); 133 exerr("Error: Cant make live sync incoming socket join mcast group.");
84 134
85 if( ( g_livesync_socket_out = socket_udp4()) < 0) 135 if( ( g_socket_out = socket_udp4()) < 0)
86 exerr("Error: Cant create live sync outgoing socket." ); 136 exerr("Error: Cant create live sync outgoing socket." );
87 if( socket_bind4_reuse( g_livesync_socket_out, ip, port ) == -1 ) 137 if( socket_bind4_reuse( g_socket_out, ip, port ) == -1 )
88 exerr("Error: Cant bind live sync outgoing socket." ); 138 exerr("Error: Cant bind live sync outgoing socket." );
89 139
90 socket_mcttl4(g_livesync_socket_out, 1); 140 socket_mcttl4(g_socket_out, 1);
91 socket_mcloop4(g_livesync_socket_out, 0); 141 socket_mcloop4(g_socket_out, 0);
92} 142}
93 143
94static void livesync_issuepacket( ) { 144static void livesync_issue_peersync( ) {
95 socket_send4(g_livesync_socket_out, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start, 145 socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start,
96 groupip_1, LIVESYNC_PORT); 146 groupip_1, LIVESYNC_PORT);
97 livesync_outbuffer_pos = livesync_outbuffer_start + sizeof( g_tracker_id ); 147 g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
98 livesync_lastpacket_time = g_now_seconds; 148 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
99} 149}
100 150
101/* Inform live sync about whats going on. */ 151static void livesync_handle_peersync( ssize_t datalen ) {
102void livesync_tell( ot_hash * const info_hash, const ot_peer * const peer ) { 152 int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
103 int i; 153
104 for(i=0;i<20;i+=4) WRITE32(livesync_outbuffer_pos,i,READ32(info_hash,i)); 154 /* Now basic sanity checks have been done on the live sync packet
105 WRITE32(livesync_outbuffer_pos,20,READ32(peer,0)); 155 We might add more testing and logging. */
106 WRITE32(livesync_outbuffer_pos,24,READ32(peer,4)); 156 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
107 livesync_outbuffer_pos += 28; 157 ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash));
108 158 ot_hash *hash = (ot_hash*)(g_inbuffer + off);
109 if( livesync_outbuffer_pos >= livesync_outbuffer_highwater ) 159
110 livesync_issuepacket(); 160 if( !g_opentracker_running ) return;
161
162 if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED )
163 remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA );
164 else
165 add_peer_to_torrent( hash, peer, FLAG_MCA );
166
167 off += sizeof( ot_hash ) + sizeof( ot_peer );
168 }
169
170 stats_issue_event(EVENT_SYNC, 0, datalen / ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer )));
111} 171}
112 172
173#ifdef WANT_SYNC_SCRAPE
174void livesync_issue_beacon( ) {
175 size_t torrent_count = mutex_get_torrent_count();
176 uint8_t beacon[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ];
177
178 memmove( beacon, &g_tracker_id, sizeof( g_tracker_id ) );
179 uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_BEACON);
180 uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + sizeof(uint32_t), (uint32_t)((uint64_t)(torrent_count)>>32) );
181 uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + 2 * sizeof(uint32_t), (uint32_t)torrent_count );
182
183 socket_send4(g_socket_out, (char*)beacon, sizeof(beacon), groupip_1, LIVESYNC_PORT);
184}
185
186void livesync_handle_beacon( ssize_t datalen ) {
187 size_t torrent_count_local, torrent_count_remote;
188 if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) )
189 return;
190 torrent_count_local = mutex_get_torrent_count();
191 torrent_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + sizeof(uint32_t))) << 32);
192 torrent_count_remote |= (size_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + 2 * sizeof(uint32_t));
193
194 /* Empty tracker is useless */
195 if( !torrent_count_remote ) return;
196
197 if( ((double)torrent_count_local ) / ((double)torrent_count_remote) < LIVESYNC_INQUIRE_THRESH) {
198 if( !g_next_inquire_time ) {
199 g_next_inquire_time = g_now_seconds + 2 * LIVESYNC_BEACON_INTERVAL;
200 g_inquire_remote_count = 0;
201 }
202
203 if( torrent_count_remote > g_inquire_remote_count ) {
204 g_inquire_remote_count = torrent_count_remote;
205 memmove( &g_inquire_remote_host, g_inbuffer, sizeof( g_tracker_id ) );
206 }
207 }
208}
209
210void livesync_issue_inquire( ) {
211 uint8_t inquire[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id)];
212
213 memmove( inquire, &g_tracker_id, sizeof( g_tracker_id ) );
214 uint32_pack_big( (char*)inquire + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_INQUIRE);
215 memmove( inquire + sizeof(g_tracker_id) + sizeof(uint32_t), &g_inquire_remote_host, sizeof( g_tracker_id ) );
216
217 socket_send4(g_socket_out, (char*)inquire, sizeof(inquire), groupip_1, LIVESYNC_PORT);
218}
219
220void livesync_handle_inquire( ssize_t datalen ) {
221 if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id) )
222 return;
223
224 /* If it isn't us, they're inquiring, ignore inquiry */
225 if( memcmp( &g_tracker_id, g_inbuffer, sizeof( g_tracker_id ) ) )
226 return;
227
228 /* Start scrape tell on next ticker */
229 if( !g_inquire_inprogress ) {
230 g_inquire_inprogress = 1;
231 g_inquire_bucket = 0;
232 }
233}
234
235void livesync_issue_tell( ) {
236 int packets_to_send = LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE;
237 while( packets_to_send > 0 && g_inquire_bucket < OT_BUCKET_COUNT ) {
238 ot_vector *torrents_list = mutex_bucket_lock( g_inquire_bucket );
239 unsigned int j;
240 for( j=0; j<torrents_list->size; ++j ) {
241 ot_torrent *torrent = (ot_torrent*)(torrents_list->data) + j;
242 memmove(g_scrapebuffer_pos, torrent->hash, sizeof(ot_hash));
243 g_scrapebuffer_pos += sizeof(ot_hash);
244 uint32_pack_big( (char*)g_scrapebuffer_pos , (uint32_t)(g_now_minutes - torrent->peer_list->base ));
245 uint32_pack_big( (char*)g_scrapebuffer_pos + 4, (uint32_t)((uint64_t)(torrent->peer_list->down_count)>>32) );
246 uint32_pack_big( (char*)g_scrapebuffer_pos + 8, (uint32_t)torrent->peer_list->down_count );
247 g_scrapebuffer_pos += 12;
248
249 if( g_scrapebuffer_pos >= g_scrapebuffer_highwater ) {
250 socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT);
251 g_scrapebuffer_pos = g_scrapebuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t);
252 --packets_to_send;
253 }
254 }
255 mutex_bucket_unlock( g_inquire_bucket++, 0 );
256 if( !g_opentracker_running )
257 return;
258 }
259 if( g_inquire_bucket == OT_BUCKET_COUNT ) {
260 socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT);
261 g_inquire_inprogress = 0;
262 }
263}
264
265void livesync_handle_tell( ssize_t datalen ) {
266 int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
267
268 /* Some instance is in progress of telling. Our inquiry was successful.
269 Don't ask again until we see next beacon. */
270 g_next_inquire_time = 0;
271
272 /* Don't cause any new inquiries during another tracker's tell */
273 if( g_next_beacon_time - g_now_seconds < LIVESYNC_BEACON_INTERVAL )
274 g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL;
275
276 while( off + sizeof(ot_hash) + 12 <= (size_t)datalen ) {
277 ot_hash *hash = (ot_hash*)(g_inbuffer+off);
278 ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash);
279 size_t down_count_remote;
280 int exactmatch;
281 ot_torrent * torrent = vector_find_or_insert(torrents_list, hash, sizeof(ot_hash), OT_HASH_COMPARE_SIZE, &exactmatch);
282 if( !torrent ) {
283 mutex_bucket_unlock_by_hash( hash, 0 );
284 continue;
285 }
286
287 if( !exactmatch ) {
288 /* Create a new torrent entry, then */
289 int i; for(i=0;i<20;i+=4) WRITE32(&torrent->hash,i,READ32(hash,i));
290
291 if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) {
292 vector_remove_torrent( torrents_list, torrent );
293 mutex_bucket_unlock_by_hash( hash, 0 );
294 continue;
295 }
296
297 byte_zero( torrent->peer_list, sizeof( ot_peerlist ) );
298 torrent->peer_list->base = g_now_minutes - uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash));
299 }
300
301 down_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+off+sizeof( ot_hash ) + sizeof(uint32_t))) << 32);
302 down_count_remote |= (size_t) uint32_read_big((char*)g_inbuffer+off+sizeof( ot_hash ) + 2 * sizeof(uint32_t));
303
304 if( down_count_remote > torrent->peer_list->down_count )
305 torrent->peer_list->down_count = down_count_remote;
306 /* else
307 We might think of sending a tell packet, if we have a much larger downloaded count
308 */
309
310 mutex_bucket_unlock( g_inquire_bucket++, exactmatch?0:1 );
311 if( !g_opentracker_running )
312 return;
313 off += sizeof(ot_hash) + 12;
314 }
315}
316#endif /* WANT_SYNC_SCRAPE */
317
113/* Tickle the live sync module from time to time, so no events get 318/* Tickle the live sync module from time to time, so no events get
114 stuck when there's not enough traffic to fill udp packets fast 319 stuck when there's not enough traffic to fill udp packets fast
115 enough */ 320 enough */
116void livesync_ticker( ) { 321void livesync_ticker( ) {
117 if( ( g_now_seconds - livesync_lastpacket_time > LIVESYNC_MAXDELAY) && 322
118 ( livesync_outbuffer_pos > livesync_outbuffer_start + sizeof( g_tracker_id ) ) ) 323 /* livesync_issue_peersync sets g_next_packet_time */
119 livesync_issuepacket(); 324 if( g_now_seconds > g_next_packet_time &&
325 g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
326 livesync_issue_peersync();
327
328#ifdef WANT_SYNC_SCRAPE
329 /* Send first beacon after running at least LIVESYNC_FIRST_BEACON_DELAY
330 seconds and not more often than every LIVESYNC_BEACON_INTERVAL seconds */
331 if( g_now_seconds > g_next_beacon_time ) {
332 livesync_issue_beacon( );
333 g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL;
334 }
335
336 /* If we're interested in an inquiry and waited long enough to see all
337 tracker's beacons, go ahead and inquire */
338 if( g_next_inquire_time && g_now_seconds > g_next_inquire_time ) {
339 livesync_issue_inquire();
340
341 /* If packet gets lost, ask again after LIVESYNC_BEACON_INTERVAL */
342 g_next_inquire_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL;
343 }
344
345 /* If we're in process of telling, let's tell. */
346 if( g_inquire_inprogress )
347 livesync_issue_tell( );
348
349#endif /* WANT_SYNC_SCRAPE */
350}
351
352/* Inform live sync about whats going on. */
353void livesync_tell( ot_hash * const info_hash, const ot_peer * const peer ) {
354 unsigned int i;
355 for(i=0;i<sizeof(ot_hash)/4;i+=4) WRITE32(g_peerbuffer_pos,i,READ32(info_hash,i));
356
357 WRITE32(g_peerbuffer_pos,sizeof(ot_hash) ,READ32(peer,0));
358 WRITE32(g_peerbuffer_pos,sizeof(ot_hash)+4,READ32(peer,4));
359
360 g_peerbuffer_pos += sizeof(ot_hash)+8;
361
362 if( g_peerbuffer_pos >= g_peerbuffer_highwater )
363 livesync_issue_peersync();
120} 364}
121 365
122static void * livesync_worker( void * args ) { 366static void * livesync_worker( void * args ) {
123 uint8_t in_ip[4]; uint16_t in_port; 367 uint8_t in_ip[4]; uint16_t in_port;
124 ssize_t datalen; 368 ssize_t datalen;
125 int off;
126 369
127 args = args; 370 (void)args;
128 371
129 while( 1 ) { 372 while( 1 ) {
130 datalen = socket_recv4(g_livesync_socket_in, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port); 373 datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, (char*)in_ip, &in_port);
131 off = 4;
132 374
133 if( datalen <= 0 ) 375 /* Expect at least tracker id and packet type */
376 if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) )
134 continue; 377 continue;
135 378 if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC))
136 if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) {
137 /* TODO: log invalid sync packet */
138 continue; 379 continue;
139 } 380 if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
140
141 if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) {
142 /* TODO: log invalid sync packet */
143 continue;
144 }
145
146 if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
147 /* TODO: log packet coming from ourselves */ 381 /* TODO: log packet coming from ourselves */
148 continue; 382 continue;
149 } 383 }
150 384
151 /* Now basic sanity checks have been done on the live sync packet 385 switch( uint32_read_big( (char*)g_inbuffer ) ) {
152 We might add more testing and logging. */ 386 case OT_SYNC_PEER:
153 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { 387 livesync_handle_peersync( datalen );
154 ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash)); 388 break;
155 ot_hash *hash = (ot_hash*)(livesync_inbuffer + off); 389#ifdef WANT_SYNC_SCRAPE
156 390 case OT_SYNC_SCRAPE_BEACON:
157 if( !g_opentracker_running ) 391 livesync_handle_beacon( datalen );
158 return NULL; 392 break;
159 393 case OT_SYNC_SCRAPE_INQUIRE:
160 if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) 394 livesync_handle_inquire( datalen );
161 remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA); 395 break;
162 else 396 case OT_SYNC_SCRAPE_TELL:
163 add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1)); 397 livesync_handle_tell( datalen );
164 398 break;
165 off += sizeof( ot_hash ) + sizeof( ot_peer ); 399#endif /* WANT_SYNC_SCRAPE */
400 default:
401 break;
166 } 402 }
167 403
168 stats_issue_event(EVENT_SYNC, 0, datalen / ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ))); 404 /* Handle outstanding requests */
405 livesync_ticker( );
169 } 406 }
407
170 /* Never returns. */ 408 /* Never returns. */
171 return NULL; 409 return NULL;
172} 410}