From ed1673eb10e98145759b4e778511593fde3cbe3a Mon Sep 17 00:00:00 2001 From: erdgeist <> Date: Fri, 2 Oct 2009 23:34:42 +0000 Subject: Network connection code seems to be working now --- proxy.c | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 79 insertions(+), 13 deletions(-) diff --git a/proxy.c b/proxy.c index cdbdec5..2ccffb4 100644 --- a/proxy.c +++ b/proxy.c @@ -211,7 +211,7 @@ static void livesync_handle_peersync( ssize_t datalen ) { } int usage( char *self ) { - fprintf( stderr, "Usage: %s -i ip -p port\n", self ); + fprintf( stderr, "Usage: %s -L -l : -c :\n", self ); return 0; } @@ -274,6 +274,7 @@ static void handle_reconnects( void ) { for( i=0; iindata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); if( !datalen || datalen < -1 ) { + fprintf( stderr, "Connection closed by remote peer.\n" ); io_close( peersocket ); reset_info_block( peer ); } else if( datalen > 0 ) { @@ -402,6 +404,7 @@ close_socket: /* Can write new sync data to the stream */ static void handle_write( int64 peersocket ) { proxy_peer *peer = io_getcookie( peersocket ); + if( !peer ) { /* Can't happen ;) */ io_close( peersocket ); @@ -416,25 +419,32 @@ static void handle_write( int64 peersocket ) { case FLAG_CONNECTING: /* Ensure that the connection is established and handle connection error */ if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { + fprintf( stderr, "failed\n" ); + reset_info_block( peer ); io_close( peersocket ); break; } io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); PROXYPEER_SETWAITTRACKERID( peer->state ); + fprintf( stderr, " succeeded.\n" ); + io_dontwantwrite( peersocket ); io_wantread( peersocket ); break; case FLAG_CONNECTED: switch( iob_send( peersocket, &peer->outdata ) ) { case 0: /* all data sent */ + fprintf( stderr, "EMPTY\n" ); io_dontwantwrite( peersocket ); break; case -3: /* an error occured */ + fprintf( stderr, "ERROR\n" ); io_close( peersocket ); reset_info_block( peer ); break; default: /* Normal operation or eagain */ + fprintf( stderr, "EGAIN\n" ); break; } break; @@ -445,7 +455,6 @@ static void handle_write( int64 peersocket ) { static void server_mainloop() { int64 sock; - tai6464 now; /* inlined livesync_init() */ memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); @@ -461,9 +470,7 @@ static void server_mainloop() { handle_reconnects( ); /* Wait for io events until next approx reconn check time */ - taia_now( &now ); - taia_addsec( &now, &now, 30 ); - io_waituntil( now ); + io_waituntil2( 30*1000 ); /* Loop over readable sockets */ while( ( sock = io_canread( ) ) != -1 ) { @@ -482,31 +489,90 @@ static void server_mainloop() { } } +static void panic( const char *routine ) { + fprintf( stderr, "%s: %s\n", routine, strerror(errno) ); + exit( 111 ); +} + +static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) { + int64 sock = socket_tcp6( ); + + if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 ) + panic( "socket_bind6_reuse" ); + + if( socket_listen( sock, SOMAXCONN) == -1 ) + panic( "socket_listen" ); + + if( !io_fd( sock ) ) + panic( "io_fd" ); + + io_setcookie( sock, (void*)FLAG_SERVERSOCKET ); + io_wantread( sock ); + return sock; +} + + +static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) { + const char *s = src; + int off, bracket = 0; + while( isspace(*s) ) ++s; + if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */ + if( !(off = scan_ip6( s, ip ) ) ) + return 0; + s += off; + if( *s == 0 || isspace(*s)) return s-src; + if( *s == ']' && bracket ) ++s; + if( !ip6_isv4mapped(ip)){ + if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0; + s++; + } else { + if( *(s++) != ':' ) return 0; + } + if( !(off = scan_ushort (s, port ) ) ) + return 0; + return off+s-src; +} + int main( int argc, char **argv ) { static pthread_t sync_in_thread_id; static pthread_t sync_out_thread_id; ot_ip6 serverip; uint16_t tmpport; - int scanon = 1, bound = 0; + int scanon = 1, lbound = 0, sbound = 0; srandom( time(NULL) ); g_tracker_id = random(); + noipv6=1; while( scanon ) { - switch( getopt( argc, argv, ":i:p:vh" ) ) { + switch( getopt( argc, argv, ":l:c:L:h" ) ) { case -1: scanon = 0; break; - case 'S': - if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); } + case 'l': + tmpport = 0; + if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } + ot_try_bind( serverip, tmpport ); + ++sbound; + break; + case 'c': + if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" ); + tmpport = 0; + if( !scan_ip6_port( optarg, + g_connections[g_connection_count].ip, + &g_connections[g_connection_count].port ) || + !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); } + g_connections[g_connection_count++].state = FLAG_OUTGOING; break; - case 'p': - if( !scan_ushort( optarg, &tmpport)) { usage( argv[0] ); exit( 1 ); } - livesync_bind_mcast( serverip, tmpport); bound++; break; + case 'L': + tmpport = 9696; + if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } + livesync_bind_mcast( serverip, tmpport); ++lbound; break; default: case '?': usage( argv[0] ); exit( 1 ); } } - if( !bound ) exerr( "No port bound." ); + if( !lbound ) exerr( "No livesync port bound." ); + if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." ); pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); -- cgit v1.2.3