From a09609d94ed7f2cc8c5447f2c482abf5024b0ae5 Mon Sep 17 00:00:00 2001 From: Dirk Engling Date: Fri, 5 Apr 2024 03:26:42 +0200 Subject: Enable live syncing v6 peers --- ot_livesync.c | 86 ++++++++++++++++++++++++++++++++++++++--------------------- ot_livesync.h | 8 +++++- 2 files changed, 63 insertions(+), 31 deletions(-) diff --git a/ot_livesync.c b/ot_livesync.c index b87fa6d..335cce5 100644 --- a/ot_livesync.c +++ b/ot_livesync.c @@ -35,7 +35,7 @@ char groupip_1[4] = { 224,0,23,5 }; #define LIVESYNC_MAXDELAY 15 /* seconds */ -enum { OT_SYNC_PEER }; +enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; /* Forward declaration */ static void * livesync_worker( void * args ); @@ -47,9 +47,14 @@ static int64 g_socket_in = -1; static int64 g_socket_out = -1; static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; -char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; -static size_t g_outbuf_data; -static ot_time g_next_packet_time; +typedef struct { + uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; + size_t fill; + ot_time next_packet_time; +} sync_buffer; + +static sync_buffer g_v6_buf; +static sync_buffer g_v4_buf; static pthread_t thread_id; void livesync_init( ) { @@ -58,11 +63,17 @@ void livesync_init( ) { exerr( "No socket address for live sync specified." ); /* Prepare outgoing peers buffer */ - memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) ); - uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER); - g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); + memcpy( g_v6_buf.data, &g_tracker_id, sizeof( g_tracker_id ) ); + memcpy( g_v4_buf.data, &g_tracker_id, sizeof( g_tracker_id ) ); + + uint32_pack_big( (char*)g_v6_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER6); + uint32_pack_big( (char*)g_v4_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER4); + + g_v6_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); + g_v4_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); - g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; + g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; + g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; pthread_create( &thread_id, NULL, livesync_worker, NULL ); } @@ -107,28 +118,28 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { } /* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ -static void livesync_issue_peersync( ) { +static void livesync_issue_peersync( sync_buffer *buf ) { char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; - size_t data = g_outbuf_data; + size_t fill = buf->fill; - memcpy( mycopy, g_outbuf, data ); - g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); - g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; + memcpy( mycopy, buf->data, fill ); + buf->fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); + buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; /* From now this thread has a local copy of the buffer and has modified the protected element */ pthread_mutex_unlock(&g_outbuf_mutex); - socket_send4(g_socket_out, mycopy, data, groupip_1, LIVESYNC_PORT); + socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT); } -static void livesync_handle_peersync( struct ot_workstruct *ws ) { - int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); +static void livesync_handle_peersync( struct ot_workstruct *ws, size_t peer_size ) { + size_t off = sizeof( g_tracker_id ) + sizeof( uint32_t ); /* Now basic sanity checks have been done on the live sync packet We might add more testing and logging. */ - while( off + (ssize_t)sizeof( ot_hash ) + OT_PEER_SIZE4 <= ws->request_size ) { - memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), OT_PEER_SIZE4 ); + while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= ws->request_size ) { + memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), peer_size ); ws->hash = (ot_hash*)(ws->request + off); if( !g_opentracker_running ) return; @@ -138,12 +149,12 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) { else add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 ); - off += sizeof( ot_hash ) + sizeof( ot_peer ); + off += sizeof( ot_hash ) + peer_size; } stats_issue_event(EVENT_SYNC, 0, (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / - ((ssize_t)sizeof( ot_hash ) + OT_PEER_SIZE4)); + ((ssize_t)sizeof( ot_hash ) + peer_size)); } /* Tickle the live sync module from time to time, so no events get @@ -152,24 +163,36 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) { void livesync_ticker( ) { /* livesync_issue_peersync sets g_next_packet_time */ pthread_mutex_lock(&g_outbuf_mutex); - if( g_now_seconds > g_next_packet_time && - g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) - livesync_issue_peersync(); + if( g_now_seconds > g_v6_buf.next_packet_time && + g_v6_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) + livesync_issue_peersync(&g_v6_buf); + else + pthread_mutex_unlock(&g_outbuf_mutex); + + pthread_mutex_lock(&g_outbuf_mutex); + if( g_now_seconds > g_v4_buf.next_packet_time && + g_v4_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) + livesync_issue_peersync(&g_v4_buf); else pthread_mutex_unlock(&g_outbuf_mutex); } /* Inform live sync about whats going on. */ void livesync_tell( struct ot_workstruct *ws ) { + size_t peer_size; /* initialized in next line */ + ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size); + sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf; + pthread_mutex_lock(&g_outbuf_mutex); - memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); - memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, OT_PEER_SIZE4 ); + memcpy( dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash) ); + dest_buf->fill += sizeof(ot_hash); - g_outbuf_data += sizeof(ot_hash) + OT_PEER_SIZE4; + memcpy( dest_buf->data + dest_buf->fill, peer_src, peer_size ); + dest_buf->fill += peer_size; - if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) - livesync_issue_peersync(); + if( dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) + livesync_issue_peersync(dest_buf); else pthread_mutex_unlock(&g_outbuf_mutex); } @@ -200,8 +223,11 @@ static void * livesync_worker( void * args ) { } switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) { - case OT_SYNC_PEER: - livesync_handle_peersync( &ws ); + case OT_SYNC_PEER6: + livesync_handle_peersync( &ws, OT_PEER_SIZE6 ); + break; + case OT_SYNC_PEER4: + livesync_handle_peersync( &ws, OT_PEER_SIZE4 ); break; default: break; diff --git a/ot_livesync.h b/ot_livesync.h index d7490e5..41bfc2e 100644 --- a/ot_livesync.h +++ b/ot_livesync.h @@ -28,13 +28,19 @@ Each tracker instance accumulates announce requests until its buffer is full or a timeout is reached. Then it broadcasts its live sync packer: - packet type SYNC_LIVE + packet type SYNC_LIVE4 [ 0x0008 0x14 info_hash 0x001c 0x04 peer's ipv4 address 0x0020 0x02 peer's port 0x0024 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 ) ]* + packet type SYNC_LIVE6 + [ 0x0008 0x14 info_hash + 0x001c 0x10 peer's ipv6 address + 0x002c 0x02 peer's port + 0x002e 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 ) + ]* */ #ifdef WANT_SYNC_LIVE -- cgit v1.2.3