summaryrefslogtreecommitdiff
path: root/proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy.c')
-rw-r--r--proxy.c92
1 files changed, 79 insertions, 13 deletions
diff --git a/proxy.c b/proxy.c
index cdbdec5..2ccffb4 100644
--- a/proxy.c
+++ b/proxy.c
@@ -211,7 +211,7 @@ static void livesync_handle_peersync( ssize_t datalen ) {
211} 211}
212 212
213int usage( char *self ) { 213int usage( char *self ) {
214 fprintf( stderr, "Usage: %s -i ip -p port\n", self ); 214 fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self );
215 return 0; 215 return 0;
216} 216}
217 217
@@ -274,6 +274,7 @@ static void handle_reconnects( void ) {
274 for( i=0; i<g_connection_count; ++i ) 274 for( i=0; i<g_connection_count; ++i )
275 if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { 275 if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) {
276 int64 newfd = socket_tcp6( ); 276 int64 newfd = socket_tcp6( );
277 fprintf( stderr, "(Re)connecting to peer..." );
277 if( newfd < 0 ) continue; /* No socket for you */ 278 if( newfd < 0 ) continue; /* No socket for you */
278 io_fd(newfd); 279 io_fd(newfd);
279 if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { 280 if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) {
@@ -389,6 +390,7 @@ close_socket:
389 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ 390 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
390 datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); 391 datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length );
391 if( !datalen || datalen < -1 ) { 392 if( !datalen || datalen < -1 ) {
393 fprintf( stderr, "Connection closed by remote peer.\n" );
392 io_close( peersocket ); 394 io_close( peersocket );
393 reset_info_block( peer ); 395 reset_info_block( peer );
394 } else if( datalen > 0 ) { 396 } else if( datalen > 0 ) {
@@ -402,6 +404,7 @@ close_socket:
402/* Can write new sync data to the stream */ 404/* Can write new sync data to the stream */
403static void handle_write( int64 peersocket ) { 405static void handle_write( int64 peersocket ) {
404 proxy_peer *peer = io_getcookie( peersocket ); 406 proxy_peer *peer = io_getcookie( peersocket );
407
405 if( !peer ) { 408 if( !peer ) {
406 /* Can't happen ;) */ 409 /* Can't happen ;) */
407 io_close( peersocket ); 410 io_close( peersocket );
@@ -416,25 +419,32 @@ static void handle_write( int64 peersocket ) {
416 case FLAG_CONNECTING: 419 case FLAG_CONNECTING:
417 /* Ensure that the connection is established and handle connection error */ 420 /* Ensure that the connection is established and handle connection error */
418 if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { 421 if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) {
422 fprintf( stderr, "failed\n" );
423 reset_info_block( peer );
419 io_close( peersocket ); 424 io_close( peersocket );
420 break; 425 break;
421 } 426 }
422 427
423 io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ); 428 io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) );
424 PROXYPEER_SETWAITTRACKERID( peer->state ); 429 PROXYPEER_SETWAITTRACKERID( peer->state );
430 fprintf( stderr, " succeeded.\n" );
431
425 io_dontwantwrite( peersocket ); 432 io_dontwantwrite( peersocket );
426 io_wantread( peersocket ); 433 io_wantread( peersocket );
427 break; 434 break;
428 case FLAG_CONNECTED: 435 case FLAG_CONNECTED:
429 switch( iob_send( peersocket, &peer->outdata ) ) { 436 switch( iob_send( peersocket, &peer->outdata ) ) {
430 case 0: /* all data sent */ 437 case 0: /* all data sent */
438 fprintf( stderr, "EMPTY\n" );
431 io_dontwantwrite( peersocket ); 439 io_dontwantwrite( peersocket );
432 break; 440 break;
433 case -3: /* an error occured */ 441 case -3: /* an error occured */
442 fprintf( stderr, "ERROR\n" );
434 io_close( peersocket ); 443 io_close( peersocket );
435 reset_info_block( peer ); 444 reset_info_block( peer );
436 break; 445 break;
437 default: /* Normal operation or eagain */ 446 default: /* Normal operation or eagain */
447 fprintf( stderr, "EGAIN\n" );
438 break; 448 break;
439 } 449 }
440 break; 450 break;
@@ -445,7 +455,6 @@ static void handle_write( int64 peersocket ) {
445 455
446static void server_mainloop() { 456static void server_mainloop() {
447 int64 sock; 457 int64 sock;
448 tai6464 now;
449 458
450 /* inlined livesync_init() */ 459 /* inlined livesync_init() */
451 memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); 460 memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) );
@@ -461,9 +470,7 @@ static void server_mainloop() {
461 handle_reconnects( ); 470 handle_reconnects( );
462 471
463 /* Wait for io events until next approx reconn check time */ 472 /* Wait for io events until next approx reconn check time */
464 taia_now( &now ); 473 io_waituntil2( 30*1000 );
465 taia_addsec( &now, &now, 30 );
466 io_waituntil( now );
467 474
468 /* Loop over readable sockets */ 475 /* Loop over readable sockets */
469 while( ( sock = io_canread( ) ) != -1 ) { 476 while( ( sock = io_canread( ) ) != -1 ) {
@@ -482,31 +489,90 @@ static void server_mainloop() {
482 } 489 }
483} 490}
484 491
492static void panic( const char *routine ) {
493 fprintf( stderr, "%s: %s\n", routine, strerror(errno) );
494 exit( 111 );
495}
496
497static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) {
498 int64 sock = socket_tcp6( );
499
500 if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 )
501 panic( "socket_bind6_reuse" );
502
503 if( socket_listen( sock, SOMAXCONN) == -1 )
504 panic( "socket_listen" );
505
506 if( !io_fd( sock ) )
507 panic( "io_fd" );
508
509 io_setcookie( sock, (void*)FLAG_SERVERSOCKET );
510 io_wantread( sock );
511 return sock;
512}
513
514
515static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) {
516 const char *s = src;
517 int off, bracket = 0;
518 while( isspace(*s) ) ++s;
519 if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */
520 if( !(off = scan_ip6( s, ip ) ) )
521 return 0;
522 s += off;
523 if( *s == 0 || isspace(*s)) return s-src;
524 if( *s == ']' && bracket ) ++s;
525 if( !ip6_isv4mapped(ip)){
526 if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0;
527 s++;
528 } else {
529 if( *(s++) != ':' ) return 0;
530 }
531 if( !(off = scan_ushort (s, port ) ) )
532 return 0;
533 return off+s-src;
534}
535
485int main( int argc, char **argv ) { 536int main( int argc, char **argv ) {
486 static pthread_t sync_in_thread_id; 537 static pthread_t sync_in_thread_id;
487 static pthread_t sync_out_thread_id; 538 static pthread_t sync_out_thread_id;
488 ot_ip6 serverip; 539 ot_ip6 serverip;
489 uint16_t tmpport; 540 uint16_t tmpport;
490 int scanon = 1, bound = 0; 541 int scanon = 1, lbound = 0, sbound = 0;
491 542
492 srandom( time(NULL) ); 543 srandom( time(NULL) );
493 g_tracker_id = random(); 544 g_tracker_id = random();
545 noipv6=1;
494 546
495 while( scanon ) { 547 while( scanon ) {
496 switch( getopt( argc, argv, ":i:p:vh" ) ) { 548 switch( getopt( argc, argv, ":l:c:L:h" ) ) {
497 case -1: scanon = 0; break; 549 case -1: scanon = 0; break;
498 case 'S': 550 case 'l':
499 if( !scan_ip6( optarg, serverip )) { usage( argv[0] ); exit( 1 ); } 551 tmpport = 0;
552 if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
553 ot_try_bind( serverip, tmpport );
554 ++sbound;
555 break;
556 case 'c':
557 if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" );
558 tmpport = 0;
559 if( !scan_ip6_port( optarg,
560 g_connections[g_connection_count].ip,
561 &g_connections[g_connection_count].port ) ||
562 !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); }
563 g_connections[g_connection_count++].state = FLAG_OUTGOING;
500 break; 564 break;
501 case 'p': 565 case 'L':
502 if( !scan_ushort( optarg, &tmpport)) { usage( argv[0] ); exit( 1 ); } 566 tmpport = 9696;
503 livesync_bind_mcast( serverip, tmpport); bound++; break; 567 if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
568 livesync_bind_mcast( serverip, tmpport); ++lbound; break;
504 default: 569 default:
505 case '?': usage( argv[0] ); exit( 1 ); 570 case '?': usage( argv[0] ); exit( 1 );
506 } 571 }
507 } 572 }
508 573
509 if( !bound ) exerr( "No port bound." ); 574 if( !lbound ) exerr( "No livesync port bound." );
575 if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." );
510 pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); 576 pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL );
511 pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); 577 pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL );
512 578