From cde8cf0559a67596b6a4c611776f7abd153225e8 Mon Sep 17 00:00:00 2001 From: erdgeist <> Date: Thu, 1 Oct 2009 17:16:15 +0000 Subject: Network handling code skeleton stands, is untested and no configure code there, yet. --- proxy.c | 181 ++++++++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 143 insertions(+), 38 deletions(-) (limited to 'proxy.c') diff --git a/proxy.c b/proxy.c index c27599b..cdbdec5 100644 --- a/proxy.c +++ b/proxy.c @@ -30,9 +30,11 @@ #include "trackerlogic.h" #include "ot_vector.h" #include "ot_mutex.h" -#include "ot_livesync.h" #include "ot_stats.h" +#define WANT_SYNC_LIVE +#include "ot_livesync.h" + ot_ip6 g_serverip; uint16_t g_serverport = 9009; uint32_t g_tracker_id; @@ -49,6 +51,7 @@ int g_self_pipe[2]; #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) +#define LIVESYNC_MAXDELAY 15 /* seconds */ /* The amount of time a complete sync cycle should take */ #define OT_SYNC_INTERVAL_MINUTES 2 @@ -65,10 +68,14 @@ static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; /* For outgoing packets */ static int64 g_socket_out = -1; -//static uint8_t g_outbuffer[STREAMSYNC_OUTGOING_BUFFSIZE]; +static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; +static uint8_t *g_peerbuffer_pos; +static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; +static ot_time g_next_packet_time; static void * livesync_worker( void * args ); static void * streamsync_worker( void * args ); +static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ); void exerr( char * message ) { fprintf( stderr, "%s\n", message ); @@ -226,15 +233,31 @@ enum { #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) typedef struct { - int state; /* Whether we want to connect, how far our handshake is, etc. */ - ot_ip6 ip; /* The peer to connect to */ - uint16_t port; /* The peers port */ - uint8_t *indata; /* Any data not processed yet */ - size_t indata_length; /* Length of unprocessed data */ - uint32_t tracker_id; /* How the other end greeted */ - int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ - io_batch outdata; /* The iobatch containing our sync data */ + int state; /* Whether we want to connect, how far our handshake is, etc. */ + ot_ip6 ip; /* The peer to connect to */ + uint16_t port; /* The peers port */ + uint8_t indata[8192*16]; /* Any data not processed yet */ + size_t indata_length; /* Length of unprocessed data */ + uint32_t tracker_id; /* How the other end greeted */ + int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ + io_batch outdata; /* The iobatch containing our sync data */ + + int packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ + char packet_tprefix; /* Prefix byte for all torrents in current packet */ + char packet_type; /* Type of current packet */ + uint32_t packet_tid; /* Tracker id for current packet */ + } proxy_peer; +static void process_indata( proxy_peer * peer ); + +void reset_info_block( proxy_peer * peer ) { + peer->indata_length = 0; + peer->tracker_id = 0; + peer->fd = -1; + peer->packet_tcount = 0; + iob_reset( &peer->outdata ); + PROXYPEER_SETDISCONNECTED( peer->state ); +} /* Number of connections to peers * If a peer's IP is set, we try to reconnect, when the connection drops @@ -266,12 +289,8 @@ static void handle_reconnects( void ) { io_setcookie(newfd,g_connections+i); /* Prepare connection info block */ - free( g_connections[i].indata ); - g_connections[i].indata = 0; - g_connections[i].indata_length = 0; + reset_info_block( g_connections+i ); g_connections[i].fd = newfd; - g_connections[i].tracker_id = 0; - iob_reset( &g_connections[i].outdata ); PROXYPEER_SETCONNECTING( g_connections[i].state ); } g_connection_reconn = time(NULL) + 30; @@ -305,16 +324,10 @@ static void handle_accept( int64 serversocket ) { } /* Prepare connection info block */ - free( g_connections[i].indata ); - g_connections[i].indata = 0; - g_connections[i].indata_length = 0; - g_connections[i].port = port; - g_connections[i].fd = newfd; - g_connections[i].tracker_id = 0; - iob_reset( &g_connections[i].outdata ); - g_connections[i].tracker_id = 0; - + reset_info_block( g_connections+i ); PROXYPEER_SETCONNECTING( g_connections[i].state ); + g_connections[i].port = port; + g_connections[i].fd = newfd; io_setcookie( newfd, g_connections + i ); @@ -328,19 +341,25 @@ static void handle_accept( int64 serversocket ) { /* New sync data on the stream */ static void handle_read( int64 peersocket ) { + int i; + int64 datalen; uint32_t tracker_id; proxy_peer *peer = io_getcookie( peersocket ); + if( !peer ) { /* Can't happen ;) */ - close( peersocket ); + io_close( peersocket ); return; } switch( peer->state & FLAG_MASK ) { - case FLAG_DISCONNECTED: break; /* Shouldnt happen */ + case FLAG_DISCONNECTED: + io_close( peersocket ); + break; /* Shouldnt happen */ case FLAG_CONNECTING: case FLAG_WAITTRACKERID: - /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) */ - if( io_tryread( peersocket, &tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) + /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) + This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ + if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) goto close_socket; /* See, if we already have a connection to that peer */ @@ -363,12 +382,20 @@ static void handle_read( int64 peersocket ) { break; close_socket: io_close( peersocket ); - PROXYPEER_SETDISCONNECTED( peer->state ); + reset_info_block( peer ); break; case FLAG_CONNECTED: - + /* Here we acutally expect data from peer + indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ + datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); + if( !datalen || datalen < -1 ) { + io_close( peersocket ); + reset_info_block( peer ); + } else if( datalen > 0 ) { + peer->indata_length += datalen; + process_indata( peer ); + } break; - } } @@ -377,13 +404,22 @@ static void handle_write( int64 peersocket ) { proxy_peer *peer = io_getcookie( peersocket ); if( !peer ) { /* Can't happen ;) */ - close( peersocket ); + io_close( peersocket ); return; } switch( peer->state & FLAG_MASK ) { - case FLAG_DISCONNECTED: break; /* Shouldnt happen */ + case FLAG_DISCONNECTED: + default: /* Should not happen */ + io_close( peersocket ); + break; case FLAG_CONNECTING: + /* Ensure that the connection is established and handle connection error */ + if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { + io_close( peersocket ); + break; + } + io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); PROXYPEER_SETWAITTRACKERID( peer->state ); io_dontwantwrite( peersocket ); @@ -396,15 +432,12 @@ static void handle_write( int64 peersocket ) { break; case -3: /* an error occured */ io_close( peersocket ); - PROXYPEER_SETDISCONNECTED( peer->state ); - iob_reset( &peer->outdata ); - free( peer->indata ); + reset_info_block( peer ); + break; default: /* Normal operation or eagain */ break; } break; - default: - break; } return; @@ -414,6 +447,14 @@ static void server_mainloop() { int64 sock; tai6464 now; + /* inlined livesync_init() */ + memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); + g_peerbuffer_pos = g_peerbuffer_start; + memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); + uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); + g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); + g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; + while(1) { /* See, if we need to connect to anyone */ if( time(NULL) > g_connection_reconn ) @@ -436,6 +477,8 @@ static void server_mainloop() { /* Loop over writable sockets */ while( ( sock = io_canwrite( ) ) != -1 ) handle_write( sock ); + + livesync_ticker( ); } } @@ -600,6 +643,68 @@ unlock_continue: return 0; } +static void livesync_issue_peersync( ) { + socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, + groupip_1, LIVESYNC_PORT); + g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); + g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; +} + +void livesync_ticker( ) { + /* livesync_issue_peersync sets g_next_packet_time */ + if( time(NULL) > g_next_packet_time && + g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) + livesync_issue_peersync(); +} + +static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) { + *g_peerbuffer_pos = prefix; + memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 ); + memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 ); + + g_peerbuffer_pos += sizeof(ot_hash) + sizeof(ot_peer); + + if( g_peerbuffer_pos >= g_peerbuffer_highwater ) + livesync_issue_peersync(); +} + +static void process_indata( proxy_peer * peer ) { + int ensuremem, consumed, peers; + uint8_t *data = peer->indata, *hash; + uint8_t *dataend = data + peer->indata_length; + + while( 1 ) { + /* If we're not inside of a packet, make a new one */ + if( !peer->packet_tcount ) { + /* Ensure the header is complete or postpone processing */ + if( data + 8 > dataend ) break; + memcpy( &peer->packet_tid, data, sizeof(peer->packet_tid) ); + peer->packet_type = data[4]; + peer->packet_tprefix = data[5]; + peer->packet_tcount = data[6] * 256 + data[7]; + data += 8; + } + + /* ensure size for the complete torrent block */ + if( data + 26 > dataend ) break; + peers = peer->packet_type ? peer->packet_type : data[19]; + ensuremem = 19 + ( peer->packet_type == 0 ) + 7 * peers; + if( data + ensuremem > dataend ) break; + + hash = data; + data += 19 + ( peer->packet_type == 0 ); + + while( peers-- ) { + livesync_proxytell( peer->packet_tprefix, hash, data ); + data += 7; + } + } + + consumed = data - peer->indata; + memmove( peer->indata, data, peer->indata_length - consumed ); + peer->indata_length -= consumed; +} + static void * livesync_worker( void * args ) { (void)args; while( 1 ) { -- cgit v1.2.3