From d42bf5a0310b8df4babff645ee91c37e9f994bfe Mon Sep 17 00:00:00 2001 From: erdgeist <> Date: Thu, 22 Apr 2010 22:08:42 +0000 Subject: ** struct ot_workstruct gets ritcher (and will become even ritcher soon). This is where we encapsulate all per-request data from peer to hash to peer_id, so that it is available everywhere without passing hundreds of pointers down the stack. Most functions that do work down the stack now accept an ot_workstruct and some flags. So it can end up in the stats/event-handler where it will be the default parameter in the future. ** peer_id is now being copied by default and moved to ot_workstruct So it is available in stats and subsequent functions. ** sync scrape madness is gone SYNC_SCRAPE was intended to sync tracker state that would normally be lost on restarts i.e. downloaded counts per torrent. The way was to push it in the tracker cloud after finding all neighbouring trackers. This is madness. It never was tested and can be done per tracker by fetching stats/mode=statedump from time to time and starting opentracker with the -l option later. ** livesync thread has its own ot_workstruct now So it can behave like ot_udp and ot_http against trackerlogic.c and get rid of the first half of the embarrassing global variables. The sending half will be fixed soon [tm]. ** stats can log completed events The author recognizes the needs of original content distributors to keep track of the amount of times a work has been downloaded. While not feasible and used on openbittorrent and other open and anonymous tracker installations, a tracker user can now choose to send those events to syslog. --- ot_livesync.c | 292 ++++++++-------------------------------------------------- 1 file changed, 36 insertions(+), 256 deletions(-) (limited to 'ot_livesync.c') diff --git a/ot_livesync.c b/ot_livesync.c index 9e1c723..87fe5cf 100644 --- a/ot_livesync.c +++ b/ot_livesync.c @@ -33,23 +33,9 @@ char groupip_1[4] = { 224,0,23,5 }; #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) -#ifdef WANT_SYNC_SCRAPE -#define LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE 1504 -#define LIVESYNC_OUTGOING_WATERMARK_SCRAPE (sizeof(ot_hash)+sizeof(uint64_t)+sizeof(uint32_t)) -#define LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE 100 - -#define LIVESYNC_FIRST_BEACON_DELAY (30*60) /* seconds */ -#define LIVESYNC_BEACON_INTERVAL 60 /* seconds */ -#define LIVESYNC_INQUIRE_THRESH 0.75 -#endif /* WANT_SYNC_SCRAPE */ - #define LIVESYNC_MAXDELAY 15 /* seconds */ -enum { OT_SYNC_PEER -#ifdef WANT_SYNC_SCRAPE - , OT_SYNC_SCRAPE_BEACON, OT_SYNC_SCRAPE_INQUIRE, OT_SYNC_SCRAPE_TELL -#endif -}; +enum { OT_SYNC_PEER }; /* Forward declaration */ static void * livesync_worker( void * args ); @@ -59,52 +45,24 @@ static int64 g_socket_in = -1; /* For incoming packets */ static int64 g_socket_out = -1; -static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; - -static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; -static uint8_t *g_peerbuffer_pos; -static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; +char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; +static size_t g_outbuf_data; static ot_time g_next_packet_time; -#ifdef WANT_SYNC_SCRAPE -/* Live sync scrape buffers, states and timers */ -static ot_time g_next_beacon_time; -static ot_time g_next_inquire_time; - -static uint8_t g_scrapebuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE]; -static uint8_t *g_scrapebuffer_pos; -static uint8_t *g_scrapebuffer_highwater = g_scrapebuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE - LIVESYNC_OUTGOING_WATERMARK_SCRAPE; - -static size_t g_inquire_remote_count; -static uint32_t g_inquire_remote_host; -static int g_inquire_inprogress; -static int g_inquire_bucket; -#endif /* WANT_SYNC_SCRAPE */ - static pthread_t thread_id; void livesync_init( ) { + if( g_socket_in == -1 ) exerr( "No socket address for live sync specified." ); /* Prepare outgoing peers buffer */ - g_peerbuffer_pos = g_peerbuffer_start; - memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); - uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); - g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); - -#ifdef WANT_SYNC_SCRAPE - /* Prepare outgoing scrape buffer */ - g_scrapebuffer_pos = g_scrapebuffer_start; - memcpy( g_scrapebuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); - uint32_pack_big( (char*)g_scrapebuffer_pos + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_TELL); - g_scrapebuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); - - /* Wind up timers for inquires */ - g_next_beacon_time = g_now_seconds + LIVESYNC_FIRST_BEACON_DELAY; -#endif /* WANT_SYNC_SCRAPE */ - g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; + memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) ); + uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER); + g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); + g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; + pthread_create( &thread_id, NULL, livesync_worker, NULL ); } @@ -148,264 +106,86 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { } static void livesync_issue_peersync( ) { - socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, - groupip_1, LIVESYNC_PORT); - g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); + socket_send4(g_socket_out, g_outbuf, g_outbuf_data, groupip_1, LIVESYNC_PORT); + g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; } -static void livesync_handle_peersync( ssize_t datalen ) { +static void livesync_handle_peersync( struct ot_workstruct *ws ) { int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); /* Now basic sanity checks have been done on the live sync packet We might add more testing and logging. */ - while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { - ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); - ot_hash *hash = (ot_hash*)(g_inbuffer + off); + while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= ws->request_size ) { + memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), sizeof( ot_peer ) ); + ws->hash = (ot_hash*)(ws->request + off); if( !g_opentracker_running ) return; - if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) - remove_peer_from_torrent( *hash, peer, NULL, FLAG_MCA ); + if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_STOPPED ) + remove_peer_from_torrent( FLAG_MCA, ws ); else - add_peer_to_torrent( *hash, peer, FLAG_MCA ); + add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 ); off += sizeof( ot_hash ) + sizeof( ot_peer ); } stats_issue_event(EVENT_SYNC, 0, - (datalen - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / + (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ))); } -#ifdef WANT_SYNC_SCRAPE -void livesync_issue_beacon( ) { - size_t torrent_count = mutex_get_torrent_count(); - uint8_t beacon[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ]; - - memcpy( beacon, &g_tracker_id, sizeof( g_tracker_id ) ); - uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_BEACON); - uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + sizeof(uint32_t), (uint32_t)((uint64_t)(torrent_count)>>32) ); - uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + 2 * sizeof(uint32_t), (uint32_t)torrent_count ); - - socket_send4(g_socket_out, (char*)beacon, sizeof(beacon), groupip_1, LIVESYNC_PORT); -} - -void livesync_handle_beacon( ssize_t datalen ) { - size_t torrent_count_local, torrent_count_remote; - if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ) - return; - torrent_count_local = mutex_get_torrent_count(); - torrent_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + sizeof(uint32_t))) << 32); - torrent_count_remote |= (size_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + 2 * sizeof(uint32_t)); - - /* Empty tracker is useless */ - if( !torrent_count_remote ) return; - - if( ((double)torrent_count_local ) / ((double)torrent_count_remote) < LIVESYNC_INQUIRE_THRESH) { - if( !g_next_inquire_time ) { - g_next_inquire_time = g_now_seconds + 2 * LIVESYNC_BEACON_INTERVAL; - g_inquire_remote_count = 0; - } - - if( torrent_count_remote > g_inquire_remote_count ) { - g_inquire_remote_count = torrent_count_remote; - memcpy( &g_inquire_remote_host, g_inbuffer, sizeof( g_tracker_id ) ); - } - } -} - -void livesync_issue_inquire( ) { - uint8_t inquire[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id)]; - - memcpy( inquire, &g_tracker_id, sizeof( g_tracker_id ) ); - uint32_pack_big( (char*)inquire + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_INQUIRE); - memcpy( inquire + sizeof(g_tracker_id) + sizeof(uint32_t), &g_inquire_remote_host, sizeof( g_tracker_id ) ); - - socket_send4(g_socket_out, (char*)inquire, sizeof(inquire), groupip_1, LIVESYNC_PORT); -} - -void livesync_handle_inquire( ssize_t datalen ) { - if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id) ) - return; - - /* If it isn't us, they're inquiring, ignore inquiry */ - if( memcmp( &g_tracker_id, g_inbuffer, sizeof( g_tracker_id ) ) ) - return; - - /* Start scrape tell on next ticker */ - if( !g_inquire_inprogress ) { - g_inquire_inprogress = 1; - g_inquire_bucket = 0; - } -} - -void livesync_issue_tell( ) { - int packets_to_send = LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE; - while( packets_to_send > 0 && g_inquire_bucket < OT_BUCKET_COUNT ) { - ot_vector *torrents_list = mutex_bucket_lock( g_inquire_bucket ); - unsigned int j; - for( j=0; jsize; ++j ) { - ot_torrent *torrent = (ot_torrent*)(torrents_list->data) + j; - memcpy(g_scrapebuffer_pos, torrent->hash, sizeof(ot_hash)); - g_scrapebuffer_pos += sizeof(ot_hash); - uint32_pack_big( (char*)g_scrapebuffer_pos , (uint32_t)(g_now_minutes - torrent->peer_list->base )); - uint32_pack_big( (char*)g_scrapebuffer_pos + 4, (uint32_t)((uint64_t)(torrent->peer_list->down_count)>>32) ); - uint32_pack_big( (char*)g_scrapebuffer_pos + 8, (uint32_t)torrent->peer_list->down_count ); - g_scrapebuffer_pos += 12; - - if( g_scrapebuffer_pos >= g_scrapebuffer_highwater ) { - socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT); - g_scrapebuffer_pos = g_scrapebuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t); - --packets_to_send; - } - } - mutex_bucket_unlock( g_inquire_bucket++, 0 ); - if( !g_opentracker_running ) - return; - } - if( g_inquire_bucket == OT_BUCKET_COUNT ) { - socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT); - g_inquire_inprogress = 0; - } -} - -void livesync_handle_tell( ssize_t datalen ) { - int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); - - /* Some instance is in progress of telling. Our inquiry was successful. - Don't ask again until we see next beacon. */ - g_next_inquire_time = 0; - - /* Don't cause any new inquiries during another tracker's tell */ - if( g_next_beacon_time - g_now_seconds < LIVESYNC_BEACON_INTERVAL ) - g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; - - while( off + sizeof(ot_hash) + 12 <= (size_t)datalen ) { - ot_hash *hash = (ot_hash*)(g_inbuffer+off); - ot_vector *torrents_list = mutex_bucket_lock_by_hash(*hash); - size_t down_count_remote; - int exactmatch; - ot_torrent *torrent = vector_find_or_insert(torrents_list, hash, sizeof(ot_hash), OT_HASH_COMPARE_SIZE, &exactmatch); - - if( !torrent ) { - mutex_bucket_unlock_by_hash( *hash, 0 ); - continue; - } - - if( !exactmatch ) { - /* Create a new torrent entry, then */ - memcpy( &torrent->hash, hash, sizeof(ot_hash)); - - if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { - vector_remove_torrent( torrents_list, torrent ); - mutex_bucket_unlock_by_hash( *hash, 0 ); - continue; - } - - byte_zero( torrent->peer_list, sizeof( ot_peerlist ) ); - torrent->peer_list->base = g_now_minutes - uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash)); - } - - down_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash ) + sizeof(uint32_t))) << 32); - down_count_remote |= (size_t) uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash ) + 2 * sizeof(uint32_t)); - - if( down_count_remote > torrent->peer_list->down_count ) - torrent->peer_list->down_count = down_count_remote; - /* else - We might think of sending a tell packet, if we have a much larger downloaded count - */ - - mutex_bucket_unlock( g_inquire_bucket++, exactmatch?0:1 ); - if( !g_opentracker_running ) - return; - off += sizeof(ot_hash) + 12; - } -} -#endif /* WANT_SYNC_SCRAPE */ - /* Tickle the live sync module from time to time, so no events get stuck when there's not enough traffic to fill udp packets fast enough */ void livesync_ticker( ) { - /* livesync_issue_peersync sets g_next_packet_time */ if( g_now_seconds > g_next_packet_time && - g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) + g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) livesync_issue_peersync(); - -#ifdef WANT_SYNC_SCRAPE - /* Send first beacon after running at least LIVESYNC_FIRST_BEACON_DELAY - seconds and not more often than every LIVESYNC_BEACON_INTERVAL seconds */ - if( g_now_seconds > g_next_beacon_time ) { - livesync_issue_beacon( ); - g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; - } - - /* If we're interested in an inquiry and waited long enough to see all - tracker's beacons, go ahead and inquire */ - if( g_next_inquire_time && g_now_seconds > g_next_inquire_time ) { - livesync_issue_inquire(); - - /* If packet gets lost, ask again after LIVESYNC_BEACON_INTERVAL */ - g_next_inquire_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; - } - - /* If we're in process of telling, let's tell. */ - if( g_inquire_inprogress ) - livesync_issue_tell( ); - -#endif /* WANT_SYNC_SCRAPE */ } /* Inform live sync about whats going on. */ -void livesync_tell( ot_hash const info_hash, const ot_peer * const peer ) { +void livesync_tell( struct ot_workstruct *ws ) { - memcpy( g_peerbuffer_pos, info_hash, sizeof(ot_hash) ); - memcpy( g_peerbuffer_pos+sizeof(ot_hash), peer, sizeof(ot_peer) ); + memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); + memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) ); - g_peerbuffer_pos += sizeof(ot_hash)+sizeof(ot_peer); + g_outbuf_data += sizeof(ot_hash) + sizeof(ot_peer); - if( g_peerbuffer_pos >= g_peerbuffer_highwater ) + if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) livesync_issue_peersync(); } static void * livesync_worker( void * args ) { + struct ot_workstruct ws; ot_ip6 in_ip; uint16_t in_port; - ssize_t datalen; (void)args; - + + /* Initialize our "thread local storage" */ + ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE ); + ws.outbuf = ws.reply = 0; + memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); while( 1 ) { - datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); + ws.request_size = socket_recv4(g_socket_in, (char*)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); /* Expect at least tracker id and packet type */ - if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) + if( ws.request_size <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) continue; if( !accesslist_isblessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) continue; - if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { + if( !memcmp( ws.inbuf, &g_tracker_id, sizeof( g_tracker_id ) ) ) { /* TODO: log packet coming from ourselves */ continue; } - switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { + switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) { case OT_SYNC_PEER: - livesync_handle_peersync( datalen ); - break; -#ifdef WANT_SYNC_SCRAPE - case OT_SYNC_SCRAPE_BEACON: - livesync_handle_beacon( datalen ); - break; - case OT_SYNC_SCRAPE_INQUIRE: - livesync_handle_inquire( datalen ); - break; - case OT_SYNC_SCRAPE_TELL: - livesync_handle_tell( datalen ); + livesync_handle_peersync( &ws ); break; -#endif /* WANT_SYNC_SCRAPE */ default: break; } -- cgit v1.2.3