From b1606fd37ead0b92c81cb22a345384120b31affc Mon Sep 17 00:00:00 2001 From: Dirk Engling Date: Mon, 15 Apr 2024 00:44:16 +0200 Subject: clang-format --- proxy.c | 828 ++++++++++++++++++++++++++++++++++------------------------------ 1 file changed, 434 insertions(+), 394 deletions(-) diff --git a/proxy.c b/proxy.c index c25611b..9946240 100644 --- a/proxy.c +++ b/proxy.c @@ -4,33 +4,33 @@ $Id$ */ /* System */ +#include +#include +#include +#include +#include +#include #include +#include #include #include -#include #include #include -#include -#include -#include -#include -#include -#include /* Libowfat */ -#include "socket.h" +#include "byte.h" #include "io.h" #include "iob.h" -#include "byte.h" -#include "scan.h" #include "ip6.h" #include "ndelay.h" +#include "scan.h" +#include "socket.h" /* Opentracker */ -#include "trackerlogic.h" -#include "ot_vector.h" #include "ot_mutex.h" #include "ot_stats.h" +#include "ot_vector.h" +#include "trackerlogic.h" #ifndef WANT_SYNC_LIVE #define WANT_SYNC_LIVE @@ -40,26 +40,26 @@ ot_ip6 g_serverip; uint16_t g_serverport = 9009; uint32_t g_tracker_id; -char groupip_1[4] = { 224,0,23,5 }; +char groupip_1[4] = {224, 0, 23, 5}; int g_self_pipe[2]; /* If you have more than 10 peers, don't use this proxy Use 20 slots for 10 peers to have room for 10 incoming connection slots */ -#define MAX_PEERS 20 +#define MAX_PEERS 20 -#define LIVESYNC_INCOMING_BUFFSIZE (256*256) -#define STREAMSYNC_OUTGOING_BUFFSIZE (256*256) +#define LIVESYNC_INCOMING_BUFFSIZE (256 * 256) +#define STREAMSYNC_OUTGOING_BUFFSIZE (256 * 256) -#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 -#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) -#define LIVESYNC_MAXDELAY 15 /* seconds */ +#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 +#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash)) +#define LIVESYNC_MAXDELAY 15 /* seconds */ /* The amount of time a complete sync cycle should take */ -#define OT_SYNC_INTERVAL_MINUTES 2 +#define OT_SYNC_INTERVAL_MINUTES 2 /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ -#define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) ) +#define OT_SYNC_SLEEP (((OT_SYNC_INTERVAL_MINUTES) * 60 * 1000000) / (OT_BUCKET_COUNT)) enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; enum { FLAG_SERVERSOCKET = 1 }; @@ -75,151 +75,153 @@ static uint8_t *g_peerbuffer_pos; static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; static ot_time g_next_packet_time; -static void * livesync_worker( void * args ); -static void * streamsync_worker( void * args ); -static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ); +static void *livesync_worker(void *args); +static void *streamsync_worker(void *args); +static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer); -void exerr( char * message ) { - fprintf( stderr, "%s\n", message ); - exit( 111 ); +void exerr(char *message) { + fprintf(stderr, "%s\n", message); + exit(111); } -void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) { - (void) event; - (void) proto; - (void) event_data; +void stats_issue_event(ot_status_event event, PROTO_FLAG proto, uintptr_t event_data) { + (void)event; + (void)proto; + (void)event_data; } -void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { - char tmpip[4] = {0,0,0,0}; +void livesync_bind_mcast(ot_ip6 ip, uint16_t port) { + char tmpip[4] = {0, 0, 0, 0}; char *v4ip; - if( !ip6_isv4mapped(ip)) + if (!ip6_isv4mapped(ip)) exerr("v6 mcast support not yet available."); - v4ip = ip+12; + v4ip = ip + 12; - if( g_socket_in != -1 ) + if (g_socket_in != -1) exerr("Error: Livesync listen ip specified twice."); - if( ( g_socket_in = socket_udp4( )) < 0) - exerr("Error: Cant create live sync incoming socket." ); + if ((g_socket_in = socket_udp4()) < 0) + exerr("Error: Cant create live sync incoming socket."); ndelay_off(g_socket_in); - if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) - exerr("Error: Cant bind live sync incoming socket." ); + if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1) + exerr("Error: Cant bind live sync incoming socket."); - if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) ) + if (socket_mcjoin4(g_socket_in, groupip_1, v4ip)) exerr("Error: Cant make live sync incoming socket join mcast group."); - if( ( g_socket_out = socket_udp4()) < 0) - exerr("Error: Cant create live sync outgoing socket." ); - if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 ) - exerr("Error: Cant bind live sync outgoing socket." ); + if ((g_socket_out = socket_udp4()) < 0) + exerr("Error: Cant create live sync outgoing socket."); + if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1) + exerr("Error: Cant bind live sync outgoing socket."); socket_mcttl4(g_socket_out, 1); socket_mcloop4(g_socket_out, 1); } -size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) { +size_t add_peer_to_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) { int exactmatch; ot_torrent *torrent; ot_peerlist *peer_list; ot_peer *peer_dest; - ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); - size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size); + ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); + size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size); - torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), compare_size, &exactmatch ); - if( !torrent ) + torrent = vector_find_or_insert(torrents_list, (void *)hash, sizeof(ot_torrent), compare_size, &exactmatch); + if (!torrent) return -1; - if( !exactmatch ) { + if (!exactmatch) { /* Create a new torrent entry, then */ - memcpy( torrent->hash, hash, sizeof(ot_hash) ); + memcpy(torrent->hash, hash, sizeof(ot_hash)); - if( !( torrent->peer_list6 = malloc( sizeof (ot_peerlist) ) ) || - !( torrent->peer_list4 = malloc( sizeof (ot_peerlist) ) ) ) { - vector_remove_torrent( torrents_list, torrent ); - mutex_bucket_unlock_by_hash( hash, 0 ); + if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) { + vector_remove_torrent(torrents_list, torrent); + mutex_bucket_unlock_by_hash(hash, 0); return -1; } - byte_zero( torrent->peer_list6, sizeof( ot_peerlist ) ); - byte_zero( torrent->peer_list4, sizeof( ot_peerlist ) ); + byte_zero(torrent->peer_list6, sizeof(ot_peerlist)); + byte_zero(torrent->peer_list4, sizeof(ot_peerlist)); } peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; /* Check for peer in torrent */ - peer_dest = vector_find_or_insert_peer( &(peer_list->peers), peer, peer_size, &exactmatch ); - if( !peer_dest ) { - mutex_bucket_unlock_by_hash( hash, 0 ); + peer_dest = vector_find_or_insert_peer(&(peer_list->peers), peer, peer_size, &exactmatch); + if (!peer_dest) { + mutex_bucket_unlock_by_hash(hash, 0); return -1; } /* Tell peer that it's fresh */ - OT_PEERTIME( peer, peer_size ) = 0; + OT_PEERTIME(peer, peer_size) = 0; /* If we hadn't had a match create peer there */ - if( !exactmatch ) { + if (!exactmatch) { peer_list->peer_count++; - if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING ) + if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING) peer_list->seed_count++; } - memcpy( peer_dest, peer, peer_size ); - mutex_bucket_unlock_by_hash( hash, 0 ); + memcpy(peer_dest, peer, peer_size); + mutex_bucket_unlock_by_hash(hash, 0); return 0; } -size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) { - int exactmatch; - ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); - ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); +size_t remove_peer_from_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) { + int exactmatch; + ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); + ot_torrent *torrent = binary_search(hash, torrents_list->data, torrents_list->size, sizeof(ot_torrent), OT_HASH_COMPARE_SIZE, &exactmatch); - if( exactmatch ) { + if (exactmatch) { ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; - switch( vector_remove_peer( &peer_list->peers, peer, peer_size ) ) { - case 2: peer_list->seed_count--; /* Intentional fallthrough */ - case 1: peer_list->peer_count--; /* Intentional fallthrough */ - default: break; + switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) { + case 2: + peer_list->seed_count--; /* Intentional fallthrough */ + case 1: + peer_list->peer_count--; /* Intentional fallthrough */ + default: + break; } } - mutex_bucket_unlock_by_hash( hash, 0 ); + mutex_bucket_unlock_by_hash(hash, 0); return 0; } -void free_peerlist( ot_peerlist *peer_list ) { - if( peer_list->peers.data ) { - if( OT_PEERLIST_HASBUCKETS( peer_list ) ) { - ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data); +void free_peerlist(ot_peerlist *peer_list) { + if (peer_list->peers.data) { + if (OT_PEERLIST_HASBUCKETS(peer_list)) { + ot_vector *bucket_list = (ot_vector *)(peer_list->peers.data); - while( peer_list->peers.size-- ) - free( bucket_list++->data ); + while (peer_list->peers.size--) + free(bucket_list++->data); } - free( peer_list->peers.data ); + free(peer_list->peers.data); } - free( peer_list ); + free(peer_list); } -static void livesync_handle_peersync( ssize_t datalen, size_t peer_size ) { - int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); +static void livesync_handle_peersync(ssize_t datalen, size_t peer_size) { + int off = sizeof(g_tracker_id) + sizeof(uint32_t); - fprintf( stderr, "." ); + fprintf(stderr, "."); - while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= datalen ) { - ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); - ot_hash *hash = (ot_hash*)(g_inbuffer + off); + while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= datalen) { + ot_peer *peer = (ot_peer *)(g_inbuffer + off + sizeof(ot_hash)); + ot_hash *hash = (ot_hash *)(g_inbuffer + off); - if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED ) - remove_peer_from_torrent_proxy( *hash, peer, peer_size ); + if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED) + remove_peer_from_torrent_proxy(*hash, peer, peer_size); else - add_peer_to_torrent_proxy( *hash, peer, peer_size ); + add_peer_to_torrent_proxy(*hash, peer, peer_size); - off += sizeof( ot_hash ) + peer_size; + off += sizeof(ot_hash) + peer_size; } } -int usage( char *self ) { - fprintf( stderr, "Usage: %s -L -l : -c :\n", self ); +int usage(char *self) { + fprintf(stderr, "Usage: %s -L -l : -c :\n", self); return 0; } @@ -234,115 +236,115 @@ enum { FLAG_MASK = 0x07 }; -#define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING) -#define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED) -#define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED) -#define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING) -#define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID) -#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) +#define PROXYPEER_NEEDSCONNECT(flag) ((flag) == FLAG_OUTGOING) +#define PROXYPEER_ISCONNECTED(flag) (((flag) & FLAG_MASK) == FLAG_CONNECTED) +#define PROXYPEER_SETDISCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_DISCONNECTED) +#define PROXYPEER_SETCONNECTING(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTING) +#define PROXYPEER_SETWAITTRACKERID(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_WAITTRACKERID) +#define PROXYPEER_SETCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTED) typedef struct { - int state; /* Whether we want to connect, how far our handshake is, etc. */ - ot_ip6 ip; /* The peer to connect to */ - uint16_t port; /* The peers port */ - uint8_t indata[8192*16]; /* Any data not processed yet */ - size_t indata_length; /* Length of unprocessed data */ - uint32_t tracker_id; /* How the other end greeted */ - int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ - io_batch outdata; /* The iobatch containing our sync data */ - - size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ - uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ - uint8_t packet_type; /* Type of current packet */ - uint32_t packet_tid; /* Tracker id for current packet */ + int state; /* Whether we want to connect, how far our handshake is, etc. */ + ot_ip6 ip; /* The peer to connect to */ + uint16_t port; /* The peers port */ + uint8_t indata[8192 * 16]; /* Any data not processed yet */ + size_t indata_length; /* Length of unprocessed data */ + uint32_t tracker_id; /* How the other end greeted */ + int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ + io_batch outdata; /* The iobatch containing our sync data */ + + size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ + uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ + uint8_t packet_type; /* Type of current packet */ + uint32_t packet_tid; /* Tracker id for current packet */ } proxy_peer; -static void process_indata( proxy_peer * peer ); +static void process_indata(proxy_peer *peer); -void reset_info_block( proxy_peer * peer ) { +void reset_info_block(proxy_peer *peer) { peer->indata_length = 0; peer->tracker_id = 0; peer->fd = -1; peer->packet_tcount = 0; - iob_reset( &peer->outdata ); - PROXYPEER_SETDISCONNECTED( peer->state ); + iob_reset(&peer->outdata); + PROXYPEER_SETDISCONNECTED(peer->state); } /* Number of connections to peers - * If a peer's IP is set, we try to reconnect, when the connection drops - * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it - * Multiple connections to/from the same ip are okay, if tracker_id doesn't match - * Reconnect attempts occur only twice a minute -*/ + * If a peer's IP is set, we try to reconnect, when the connection drops + * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it + * Multiple connections to/from the same ip are okay, if tracker_id doesn't match + * Reconnect attempts occur only twice a minute + */ static int g_connection_count; static ot_time g_connection_reconn; static proxy_peer g_connections[MAX_PEERS]; -static void handle_reconnects( void ) { +static void handle_reconnects(void) { int i; - for( i=0; istate & FLAG_MASK ) { + switch (peer->state & FLAG_MASK) { case FLAG_DISCONNECTED: - io_close( peersocket ); + io_close(peersocket); break; /* Shouldnt happen */ case FLAG_CONNECTING: case FLAG_WAITTRACKERID: /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ - if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) + if (io_tryread(peersocket, (void *)&tracker_id, sizeof(tracker_id)) != sizeof(tracker_id)) goto close_socket; /* See, if we already have a connection to that peer */ - for( i=0; istate == FLAG_CONNECTING ) - if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) ) + if (peer->state == FLAG_CONNECTING) + if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) != sizeof(g_tracker_id)) goto close_socket; peer->tracker_id = tracker_id; - PROXYPEER_SETCONNECTED( peer->state ); + PROXYPEER_SETCONNECTED(peer->state); - if( peer->state & FLAG_OUTGOING ) - fprintf( stderr, "succeeded.\n" ); + if (peer->state & FLAG_OUTGOING) + fprintf(stderr, "succeeded.\n"); else - fprintf( stderr, "Incoming connection successful.\n" ); + fprintf(stderr, "Incoming connection successful.\n"); break; -close_socket: - fprintf( stderr, "Handshake incomplete, closing socket\n" ); - io_close( peersocket ); - reset_info_block( peer ); + close_socket: + fprintf(stderr, "Handshake incomplete, closing socket\n"); + io_close(peersocket); + reset_info_block(peer); break; case FLAG_CONNECTED: /* Here we acutally expect data from peer indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ - datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); - if( !datalen || datalen < -1 ) { - fprintf( stderr, "Connection closed by remote peer.\n" ); - io_close( peersocket ); - reset_info_block( peer ); - } else if( datalen > 0 ) { + datalen = io_tryread(peersocket, (void *)(peer->indata + peer->indata_length), sizeof(peer->indata) - peer->indata_length); + if (!datalen || datalen < -1) { + fprintf(stderr, "Connection closed by remote peer.\n"); + io_close(peersocket); + reset_info_block(peer); + } else if (datalen > 0) { peer->indata_length += datalen; - process_indata( peer ); + process_indata(peer); } break; } } /* Can write new sync data to the stream */ -static void handle_write( int64 peersocket ) { - proxy_peer *peer = io_getcookie( peersocket ); +static void handle_write(int64 peersocket) { + proxy_peer *peer = io_getcookie(peersocket); - if( !peer ) { + if (!peer) { /* Can't happen ;) */ - io_close( peersocket ); + io_close(peersocket); return; } - switch( peer->state & FLAG_MASK ) { + switch (peer->state & FLAG_MASK) { case FLAG_DISCONNECTED: default: /* Should not happen */ - io_close( peersocket ); + io_close(peersocket); break; case FLAG_CONNECTING: /* Ensure that the connection is established and handle connection error */ - if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { - fprintf( stderr, "failed\n" ); - reset_info_block( peer ); - io_close( peersocket ); - break; + if (peer->state & FLAG_OUTGOING && !socket_connected(peersocket)) { + fprintf(stderr, "failed\n"); + reset_info_block(peer); + io_close(peersocket); + break; } - if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) { - PROXYPEER_SETWAITTRACKERID( peer->state ); - io_dontwantwrite( peersocket ); - io_wantread( peersocket ); + if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) == sizeof(g_tracker_id)) { + PROXYPEER_SETWAITTRACKERID(peer->state); + io_dontwantwrite(peersocket); + io_wantread(peersocket); } else { - fprintf( stderr, "Handshake incomplete, closing socket\n" ); - io_close( peersocket ); - reset_info_block( peer ); + fprintf(stderr, "Handshake incomplete, closing socket\n"); + io_close(peersocket); + reset_info_block(peer); } break; case FLAG_CONNECTED: - switch( iob_send( peersocket, &peer->outdata ) ) { + switch (iob_send(peersocket, &peer->outdata)) { case 0: /* all data sent */ - io_dontwantwrite( peersocket ); + io_dontwantwrite(peersocket); break; case -3: /* an error occured */ - io_close( peersocket ); - reset_info_block( peer ); + io_close(peersocket); + reset_info_block(peer); break; default: /* Normal operation or eagain */ break; @@ -475,289 +476,324 @@ static void server_mainloop() { int64 sock; /* inlined livesync_init() */ - memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); + memset(g_peerbuffer_start, 0, sizeof(g_peerbuffer_start)); g_peerbuffer_pos = g_peerbuffer_start; - memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); - uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); - g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); - g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; + memcpy(g_peerbuffer_pos, &g_tracker_id, sizeof(g_tracker_id)); + uint32_pack_big((char *)g_peerbuffer_pos + sizeof(g_tracker_id), OT_SYNC_PEER); + g_peerbuffer_pos += sizeof(g_tracker_id) + sizeof(uint32_t); + g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; - while(1) { + while (1) { /* See if we need to connect to anyone */ - if( time(NULL) > g_connection_reconn ) - handle_reconnects( ); + if (time(NULL) > g_connection_reconn) + handle_reconnects(); /* Wait for io events until next approx reconn check time */ - io_waituntil2( 30*1000 ); + io_waituntil2(30 * 1000); /* Loop over readable sockets */ - while( ( sock = io_canread( ) ) != -1 ) { - const void *cookie = io_getcookie( sock ); - if( (uintptr_t)cookie == FLAG_SERVERSOCKET ) - handle_accept( sock ); + while ((sock = io_canread()) != -1) { + const void *cookie = io_getcookie(sock); + if ((uintptr_t)cookie == FLAG_SERVERSOCKET) + handle_accept(sock); else - handle_read( sock ); + handle_read(sock); } /* Loop over writable sockets */ - while( ( sock = io_canwrite( ) ) != -1 ) - handle_write( sock ); + while ((sock = io_canwrite()) != -1) + handle_write(sock); - livesync_ticker( ); + livesync_ticker(); } } -static void panic( const char *routine ) { - fprintf( stderr, "%s: %s\n", routine, strerror(errno) ); - exit( 111 ); +static void panic(const char *routine) { + fprintf(stderr, "%s: %s\n", routine, strerror(errno)); + exit(111); } -static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) { - int64 sock = socket_tcp6( ); +static int64_t ot_try_bind(ot_ip6 ip, uint16_t port) { + int64 sock = socket_tcp6(); - if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 ) - panic( "socket_bind6_reuse" ); + if (socket_bind6_reuse(sock, ip, port, 0) == -1) + panic("socket_bind6_reuse"); - if( socket_listen( sock, SOMAXCONN) == -1 ) - panic( "socket_listen" ); + if (socket_listen(sock, SOMAXCONN) == -1) + panic("socket_listen"); - if( !io_fd( sock ) ) - panic( "io_fd" ); + if (!io_fd(sock)) + panic("io_fd"); - io_setcookie( sock, (void*)FLAG_SERVERSOCKET ); - io_wantread( sock ); + io_setcookie(sock, (void *)FLAG_SERVERSOCKET); + io_wantread(sock); return sock; } - -static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) { +static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) { const char *s = src; - int off, bracket = 0; - while( isspace(*s) ) ++s; - if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */ - if( !(off = scan_ip6( s, ip ) ) ) + int off, bracket = 0; + while (isspace(*s)) + ++s; + if (*s == '[') + ++s, ++bracket; /* for v6 style notation */ + if (!(off = scan_ip6(s, ip))) return 0; s += off; - if( *s == 0 || isspace(*s)) return s-src; - if( *s == ']' && bracket ) ++s; - if( !ip6_isv4mapped(ip)){ - if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0; + if (*s == 0 || isspace(*s)) + return s - src; + if (*s == ']' && bracket) + ++s; + if (!ip6_isv4mapped(ip)) { + if ((bracket && *(s) != ':') || (*(s) != '.')) + return 0; s++; } else { - if( *(s++) != ':' ) return 0; + if (*(s++) != ':') + return 0; } - if( !(off = scan_ushort (s, port ) ) ) - return 0; - return off+s-src; + if (!(off = scan_ushort(s, port))) + return 0; + return off + s - src; } -int main( int argc, char **argv ) { +int main(int argc, char **argv) { static pthread_t sync_in_thread_id; static pthread_t sync_out_thread_id; - ot_ip6 serverip; - uint16_t tmpport; - int scanon = 1, lbound = 0, sbound = 0; + ot_ip6 serverip; + uint16_t tmpport; + int scanon = 1, lbound = 0, sbound = 0; - srandom( time(NULL) ); + srandom(time(NULL)); #ifdef WANT_ARC4RANDOM g_tracker_id = arc4random(); #else g_tracker_id = random(); #endif - while( scanon ) { - switch( getopt( argc, argv, ":l:c:L:h" ) ) { - case -1: scanon = 0; break; + while (scanon) { + switch (getopt(argc, argv, ":l:c:L:h")) { + case -1: + scanon = 0; + break; case 'l': tmpport = 0; - if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } - ot_try_bind( serverip, tmpport ); + if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) { + usage(argv[0]); + exit(1); + } + ot_try_bind(serverip, tmpport); ++sbound; break; case 'c': - if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" ); + if (g_connection_count > MAX_PEERS / 2) + exerr("Connection limit exceeded.\n"); tmpport = 0; - if( !scan_ip6_port( optarg, - g_connections[g_connection_count].ip, - &g_connections[g_connection_count].port ) || - !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); } + if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) { + usage(argv[0]); + exit(1); + } g_connections[g_connection_count++].state = FLAG_OUTGOING; break; case 'L': tmpport = 9696; - if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } - livesync_bind_mcast( serverip, tmpport); ++lbound; break; + if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) { + usage(argv[0]); + exit(1); + } + livesync_bind_mcast(serverip, tmpport); + ++lbound; + break; default: - case '?': usage( argv[0] ); exit( 1 ); + case '?': + usage(argv[0]); + exit(1); } } - if( !lbound ) exerr( "No livesync port bound." ); - if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." ); - pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); - pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); + if (!lbound) + exerr("No livesync port bound."); + if (!g_connection_count && !sbound) + exerr("No streamsync port bound."); + pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL); + pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL); server_mainloop(); return 0; } -static void * streamsync_worker( void * args ) { +static void *streamsync_worker(void *args) { (void)args; - while( 1 ) { + while (1) { int bucket; /* For each bucket... */ - for( bucket=0; bucketsize ) goto unlock_continue; + if (!torrents_list->size) + goto unlock_continue; /* For each torrent in this bucket.. */ - for( tor_offset=0; tor_offsetsize; ++tor_offset ) { + for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) { /* Address torrents members */ - ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; - switch( peer_list->peer_count ) { - case 2: count_two++; break; - case 1: count_one++; break; - case 0: break; - default: count_def++; - count_peers += peer_list->peer_count; + ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list; + switch (peer_list->peer_count) { + case 2: + count_two++; + break; + case 1: + count_one++; + break; + case 0: + break; + default: + count_def++; + count_peers += peer_list->peer_count; } } /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ - mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) + - ( count_one + 2 * count_two + count_peers ) * 7; - - fprintf( stderr, "Mem: %zd\n", mem ); - - ptr = ptr_a = ptr_b = ptr_c = malloc( mem ); - if( !ptr ) goto unlock_continue; - - if( count_one > 4 || !count_def ) { - mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 ); - ptr_b += mem_a; ptr_c += mem_a; - ptr_a[0] = 1; /* Offset 0: packet type 1 */ - ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ - ptr_a[2] = count_one >> 8; - ptr_a[3] = count_one & 255; - ptr_a += 4; + mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7; + + fprintf(stderr, "Mem: %zd\n", mem); + + ptr = ptr_a = ptr_b = ptr_c = malloc(mem); + if (!ptr) + goto unlock_continue; + + if (count_one > 4 || !count_def) { + mem_a = 1 + 1 + 2 + count_one * (19 + 7); + ptr_b += mem_a; + ptr_c += mem_a; + ptr_a[0] = 1; /* Offset 0: packet type 1 */ + ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ + ptr_a[2] = count_one >> 8; + ptr_a[3] = count_one & 255; + ptr_a += 4; } else count_def += count_one; - if( count_two > 4 || !count_def ) { - mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 ); - ptr_c += mem_b; - ptr_b[0] = 2; /* Offset 0: packet type 2 */ - ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ - ptr_b[2] = count_two >> 8; - ptr_b[3] = count_two & 255; - ptr_b += 4; + if (count_two > 4 || !count_def) { + mem_b = 1 + 1 + 2 + count_two * (19 + 14); + ptr_c += mem_b; + ptr_b[0] = 2; /* Offset 0: packet type 2 */ + ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ + ptr_b[2] = count_two >> 8; + ptr_b[3] = count_two & 255; + ptr_b += 4; } else count_def += count_two; - if( count_def ) { - ptr_c[0] = 0; /* Offset 0: packet type 0 */ - ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ - ptr_c[2] = count_def >> 8; - ptr_c[3] = count_def & 255; - ptr_c += 4; + if (count_def) { + ptr_c[0] = 0; /* Offset 0: packet type 0 */ + ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ + ptr_c[2] = count_def >> 8; + ptr_c[3] = count_def & 255; + ptr_c += 4; } /* For each torrent in this bucket.. */ - for( tor_offset=0; tor_offsetsize; ++tor_offset ) { + for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) { /* Address torrents members */ - ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset; + ot_torrent *torrent = ((ot_torrent *)(torrents_list->data)) + tor_offset; ot_peerlist *peer_list = torrent->peer_list; - ot_peer *peers = (ot_peer*)(peer_list->peers.data); - uint8_t **dst; + ot_peer *peers = (ot_peer *)(peer_list->peers.data); + uint8_t **dst; /* Determine destination slot */ count_peers = peer_list->peer_count; - switch( count_peers ) { - case 0: continue; - case 1: dst = mem_a ? &ptr_a : &ptr_c; break; - case 2: dst = mem_b ? &ptr_b : &ptr_c; break; - default: dst = &ptr_c; break; + switch (count_peers) { + case 0: + continue; + case 1: + dst = mem_a ? &ptr_a : &ptr_c; + break; + case 2: + dst = mem_b ? &ptr_b : &ptr_c; + break; + default: + dst = &ptr_c; + break; } /* Copy tail of info_hash, advance pointer */ - memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1); - *dst += sizeof( ot_hash ) - 1; + memcpy(*dst, ((uint8_t *)torrent->hash) + 1, sizeof(ot_hash) - 1); + *dst += sizeof(ot_hash) - 1; /* Encode peer count */ - if( dst == &ptr_c ) - while( count_peers ) { - if( count_peers <= 0x7f ) + if (dst == &ptr_c) + while (count_peers) { + if (count_peers <= 0x7f) *(*dst)++ = count_peers; else - *(*dst)++ = 0x80 | ( count_peers & 0x7f ); + *(*dst)++ = 0x80 | (count_peers & 0x7f); count_peers >>= 7; } /* Copy peers */ count_peers = peer_list->peer_count; - while( count_peers-- ) { - memcpy( *dst, peers++, OT_IP_SIZE + 3 ); + while (count_peers--) { + memcpy(*dst, peers++, OT_IP_SIZE + 3); *dst += OT_IP_SIZE + 3; } free_peerlist(peer_list); } - free( torrents_list->data ); - memset( torrents_list, 0, sizeof(*torrents_list ) ); -unlock_continue: - mutex_bucket_unlock( bucket, 0 ); + free(torrents_list->data); + memset(torrents_list, 0, sizeof(*torrents_list)); + unlock_continue: + mutex_bucket_unlock(bucket, 0); - if( ptr ) { + if (ptr) { int i; - if( ptr_b > ptr_c ) ptr_c = ptr_b; - if( ptr_a > ptr_c ) ptr_c = ptr_a; + if (ptr_b > ptr_c) + ptr_c = ptr_b; + if (ptr_a > ptr_c) + ptr_c = ptr_a; mem = ptr_c - ptr; - for( i=0; i < MAX_PEERS; ++i ) { - if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) { - void *tmp = malloc( mem ); - if( tmp ) { - memcpy( tmp, ptr, mem ); - iob_addbuf_free( &g_connections[i].outdata, tmp, mem ); - io_wantwrite( g_connections[i].fd ); + for (i = 0; i < MAX_PEERS; ++i) { + if (PROXYPEER_ISCONNECTED(g_connections[i].state)) { + void *tmp = malloc(mem); + if (tmp) { + memcpy(tmp, ptr, mem); + iob_addbuf_free(&g_connections[i].outdata, tmp, mem); + io_wantwrite(g_connections[i].fd); } } } - free( ptr ); + free(ptr); } - usleep( OT_SYNC_SLEEP ); + usleep(OT_SYNC_SLEEP); } } return 0; } -static void livesync_issue_peersync( ) { - socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, - groupip_1, LIVESYNC_PORT); - g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); +static void livesync_issue_peersync() { + socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT); + g_peerbuffer_pos = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t); g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; } -void livesync_ticker( ) { +void livesync_ticker() { /* livesync_issue_peersync sets g_next_packet_time */ - if( time(NULL) > g_next_packet_time && - g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) + if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id)) livesync_issue_peersync(); } -static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) { -// unsigned int i; +static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer) { + // unsigned int i; *g_peerbuffer_pos = prefix; - memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 ); - memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 ); + memcpy(g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1); + memcpy(g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1); #if 0 /* Dump info_hash */ @@ -772,80 +808,84 @@ static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *pee #endif g_peerbuffer_pos += sizeof(ot_peer); - if( g_peerbuffer_pos >= g_peerbuffer_highwater ) + if (g_peerbuffer_pos >= g_peerbuffer_highwater) livesync_issue_peersync(); } -static void process_indata( proxy_peer * peer ) { - size_t consumed, peers; +static void process_indata(proxy_peer *peer) { + size_t consumed, peers; uint8_t *data = peer->indata, *hash; uint8_t *dataend = data + peer->indata_length; - while( 1 ) { + while (1) { /* If we're not inside of a packet, make a new one */ - if( !peer->packet_tcount ) { + if (!peer->packet_tcount) { /* Ensure the header is complete or postpone processing */ - if( data + 4 > dataend ) break; - peer->packet_type = data[0]; - peer->packet_tprefix = data[1]; - peer->packet_tcount = data[2] * 256 + data[3]; - data += 4; -printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount ); + if (data + 4 > dataend) + break; + peer->packet_type = data[0]; + peer->packet_tprefix = data[1]; + peer->packet_tcount = data[2] * 256 + data[3]; + data += 4; + printf("type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount); } /* Ensure size for a minimal torrent block */ - if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break; + if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend) + break; /* Advance pointer to peer count or peers */ - hash = data; - data += sizeof(ot_hash) - 1; + hash = data; + data += sizeof(ot_hash) - 1; /* Type 0 has peer count encoded before each peers */ - peers = peer->packet_type; - if( !peers ) { + peers = peer->packet_type; + if (!peers) { int shift = 0; - do peers |= ( 0x7f & *data ) << ( 7 * shift ); - while ( *(data++) & 0x80 && shift++ < 6 ); + do + peers |= (0x7f & *data) << (7 * shift); + while (*(data++) & 0x80 && shift++ < 6); } #if 0 printf( "peers: %zd\n", peers ); #endif /* Ensure enough data being read to hold all peers */ - if( data + (OT_IP_SIZE + 3) * peers > dataend ) { + if (data + (OT_IP_SIZE + 3) * peers > dataend) { data = hash; break; } - while( peers-- ) { - livesync_proxytell( peer->packet_tprefix, hash, data ); + while (peers--) { + livesync_proxytell(peer->packet_tprefix, hash, data); data += OT_IP_SIZE + 3; } --peer->packet_tcount; } consumed = data - peer->indata; - memmove( peer->indata, data, peer->indata_length - consumed ); + memmove(peer->indata, data, peer->indata_length - consumed); peer->indata_length -= consumed; } -static void * livesync_worker( void * args ) { +static void *livesync_worker(void *args) { (void)args; - while( 1 ) { - ot_ip6 in_ip; uint16_t in_port; - size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); + while (1) { + ot_ip6 in_ip; + uint16_t in_port; + size_t datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port); /* Expect at least tracker id and packet type */ - if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) + if (datalen <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t))) continue; - if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { + if (!memcmp(g_inbuffer, &g_tracker_id, sizeof(g_tracker_id))) { /* drop packet coming from ourselves */ continue; } - switch( uint32_read_big( (char*)g_inbuffer + sizeof( g_tracker_id ) ) ) { + switch (uint32_read_big((char *)g_inbuffer + sizeof(g_tracker_id))) { case OT_SYNC_PEER4: - livesync_handle_peersync( datalen, OT_PEER_SIZE4 ); + livesync_handle_peersync(datalen, OT_PEER_SIZE4); break; case OT_SYNC_PEER6: - livesync_handle_peersync( datalen, OT_PEER_SIZE6 ); + livesync_handle_peersync(datalen, OT_PEER_SIZE6); break; default: // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); -- cgit v1.2.3