diff options
| author | Dirk Engling <erdgeist@erdgeist.org> | 2024-04-05 03:26:42 +0200 |
|---|---|---|
| committer | Dirk Engling <erdgeist@erdgeist.org> | 2024-04-05 03:26:42 +0200 |
| commit | a09609d94ed7f2cc8c5447f2c482abf5024b0ae5 (patch) | |
| tree | cab76f4c46689d76db808018d3f591ddf2f84482 | |
| parent | 308e91a2fa45ad3714fbd39e6cacde464b0280a8 (diff) | |
Enable live syncing v6 peers
| -rw-r--r-- | ot_livesync.c | 86 | ||||
| -rw-r--r-- | ot_livesync.h | 8 |
2 files changed, 63 insertions, 31 deletions
diff --git a/ot_livesync.c b/ot_livesync.c index b87fa6d..335cce5 100644 --- a/ot_livesync.c +++ b/ot_livesync.c | |||
| @@ -35,7 +35,7 @@ char groupip_1[4] = { 224,0,23,5 }; | |||
| 35 | 35 | ||
| 36 | #define LIVESYNC_MAXDELAY 15 /* seconds */ | 36 | #define LIVESYNC_MAXDELAY 15 /* seconds */ |
| 37 | 37 | ||
| 38 | enum { OT_SYNC_PEER }; | 38 | enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; |
| 39 | 39 | ||
| 40 | /* Forward declaration */ | 40 | /* Forward declaration */ |
| 41 | static void * livesync_worker( void * args ); | 41 | static void * livesync_worker( void * args ); |
| @@ -47,9 +47,14 @@ static int64 g_socket_in = -1; | |||
| 47 | static int64 g_socket_out = -1; | 47 | static int64 g_socket_out = -1; |
| 48 | 48 | ||
| 49 | static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; | 49 | static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; |
| 50 | char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; | 50 | typedef struct { |
| 51 | static size_t g_outbuf_data; | 51 | uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; |
| 52 | static ot_time g_next_packet_time; | 52 | size_t fill; |
| 53 | ot_time next_packet_time; | ||
| 54 | } sync_buffer; | ||
| 55 | |||
| 56 | static sync_buffer g_v6_buf; | ||
| 57 | static sync_buffer g_v4_buf; | ||
| 53 | 58 | ||
| 54 | static pthread_t thread_id; | 59 | static pthread_t thread_id; |
| 55 | void livesync_init( ) { | 60 | void livesync_init( ) { |
| @@ -58,11 +63,17 @@ void livesync_init( ) { | |||
| 58 | exerr( "No socket address for live sync specified." ); | 63 | exerr( "No socket address for live sync specified." ); |
| 59 | 64 | ||
| 60 | /* Prepare outgoing peers buffer */ | 65 | /* Prepare outgoing peers buffer */ |
| 61 | memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) ); | 66 | memcpy( g_v6_buf.data, &g_tracker_id, sizeof( g_tracker_id ) ); |
| 62 | uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER); | 67 | memcpy( g_v4_buf.data, &g_tracker_id, sizeof( g_tracker_id ) ); |
| 63 | g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 68 | |
| 69 | uint32_pack_big( (char*)g_v6_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER6); | ||
| 70 | uint32_pack_big( (char*)g_v4_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER4); | ||
| 71 | |||
| 72 | g_v6_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); | ||
| 73 | g_v4_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); | ||
| 64 | 74 | ||
| 65 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | 75 | g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
| 76 | g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | ||
| 66 | 77 | ||
| 67 | pthread_create( &thread_id, NULL, livesync_worker, NULL ); | 78 | pthread_create( &thread_id, NULL, livesync_worker, NULL ); |
| 68 | } | 79 | } |
| @@ -107,28 +118,28 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | |||
| 107 | } | 118 | } |
| 108 | 119 | ||
| 109 | /* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ | 120 | /* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ |
| 110 | static void livesync_issue_peersync( ) { | 121 | static void livesync_issue_peersync( sync_buffer *buf ) { |
| 111 | char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; | 122 | char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; |
| 112 | size_t data = g_outbuf_data; | 123 | size_t fill = buf->fill; |
| 113 | 124 | ||
| 114 | memcpy( mycopy, g_outbuf, data ); | 125 | memcpy( mycopy, buf->data, fill ); |
| 115 | g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 126 | buf->fill = sizeof( g_tracker_id ) + sizeof( uint32_t ); |
| 116 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | 127 | buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
| 117 | 128 | ||
| 118 | /* From now this thread has a local copy of the buffer and | 129 | /* From now this thread has a local copy of the buffer and |
| 119 | has modified the protected element */ | 130 | has modified the protected element */ |
| 120 | pthread_mutex_unlock(&g_outbuf_mutex); | 131 | pthread_mutex_unlock(&g_outbuf_mutex); |
| 121 | 132 | ||
| 122 | socket_send4(g_socket_out, mycopy, data, groupip_1, LIVESYNC_PORT); | 133 | socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT); |
| 123 | } | 134 | } |
| 124 | 135 | ||
| 125 | static void livesync_handle_peersync( struct ot_workstruct *ws ) { | 136 | static void livesync_handle_peersync( struct ot_workstruct *ws, size_t peer_size ) { |
| 126 | int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 137 | size_t off = sizeof( g_tracker_id ) + sizeof( uint32_t ); |
| 127 | 138 | ||
| 128 | /* Now basic sanity checks have been done on the live sync packet | 139 | /* Now basic sanity checks have been done on the live sync packet |
| 129 | We might add more testing and logging. */ | 140 | We might add more testing and logging. */ |
| 130 | while( off + (ssize_t)sizeof( ot_hash ) + OT_PEER_SIZE4 <= ws->request_size ) { | 141 | while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= ws->request_size ) { |
| 131 | memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), OT_PEER_SIZE4 ); | 142 | memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), peer_size ); |
| 132 | ws->hash = (ot_hash*)(ws->request + off); | 143 | ws->hash = (ot_hash*)(ws->request + off); |
| 133 | 144 | ||
| 134 | if( !g_opentracker_running ) return; | 145 | if( !g_opentracker_running ) return; |
| @@ -138,12 +149,12 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) { | |||
| 138 | else | 149 | else |
| 139 | add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 ); | 150 | add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 ); |
| 140 | 151 | ||
| 141 | off += sizeof( ot_hash ) + sizeof( ot_peer ); | 152 | off += sizeof( ot_hash ) + peer_size; |
| 142 | } | 153 | } |
| 143 | 154 | ||
| 144 | stats_issue_event(EVENT_SYNC, 0, | 155 | stats_issue_event(EVENT_SYNC, 0, |
| 145 | (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / | 156 | (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / |
| 146 | ((ssize_t)sizeof( ot_hash ) + OT_PEER_SIZE4)); | 157 | ((ssize_t)sizeof( ot_hash ) + peer_size)); |
| 147 | } | 158 | } |
| 148 | 159 | ||
| 149 | /* Tickle the live sync module from time to time, so no events get | 160 | /* Tickle the live sync module from time to time, so no events get |
| @@ -152,24 +163,36 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) { | |||
| 152 | void livesync_ticker( ) { | 163 | void livesync_ticker( ) { |
| 153 | /* livesync_issue_peersync sets g_next_packet_time */ | 164 | /* livesync_issue_peersync sets g_next_packet_time */ |
| 154 | pthread_mutex_lock(&g_outbuf_mutex); | 165 | pthread_mutex_lock(&g_outbuf_mutex); |
| 155 | if( g_now_seconds > g_next_packet_time && | 166 | if( g_now_seconds > g_v6_buf.next_packet_time && |
| 156 | g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) | 167 | g_v6_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) |
| 157 | livesync_issue_peersync(); | 168 | livesync_issue_peersync(&g_v6_buf); |
| 169 | else | ||
| 170 | pthread_mutex_unlock(&g_outbuf_mutex); | ||
| 171 | |||
| 172 | pthread_mutex_lock(&g_outbuf_mutex); | ||
| 173 | if( g_now_seconds > g_v4_buf.next_packet_time && | ||
| 174 | g_v4_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) | ||
| 175 | livesync_issue_peersync(&g_v4_buf); | ||
| 158 | else | 176 | else |
| 159 | pthread_mutex_unlock(&g_outbuf_mutex); | 177 | pthread_mutex_unlock(&g_outbuf_mutex); |
| 160 | } | 178 | } |
| 161 | 179 | ||
| 162 | /* Inform live sync about whats going on. */ | 180 | /* Inform live sync about whats going on. */ |
| 163 | void livesync_tell( struct ot_workstruct *ws ) { | 181 | void livesync_tell( struct ot_workstruct *ws ) { |
| 182 | size_t peer_size; /* initialized in next line */ | ||
| 183 | ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size); | ||
| 184 | sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf; | ||
| 185 | |||
| 164 | pthread_mutex_lock(&g_outbuf_mutex); | 186 | pthread_mutex_lock(&g_outbuf_mutex); |
| 165 | 187 | ||
| 166 | memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); | 188 | memcpy( dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash) ); |
| 167 | memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, OT_PEER_SIZE4 ); | 189 | dest_buf->fill += sizeof(ot_hash); |
| 168 | 190 | ||
| 169 | g_outbuf_data += sizeof(ot_hash) + OT_PEER_SIZE4; | 191 | memcpy( dest_buf->data + dest_buf->fill, peer_src, peer_size ); |
| 192 | dest_buf->fill += peer_size; | ||
| 170 | 193 | ||
| 171 | if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) | 194 | if( dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) |
| 172 | livesync_issue_peersync(); | 195 | livesync_issue_peersync(dest_buf); |
| 173 | else | 196 | else |
| 174 | pthread_mutex_unlock(&g_outbuf_mutex); | 197 | pthread_mutex_unlock(&g_outbuf_mutex); |
| 175 | } | 198 | } |
| @@ -200,8 +223,11 @@ static void * livesync_worker( void * args ) { | |||
| 200 | } | 223 | } |
| 201 | 224 | ||
| 202 | switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) { | 225 | switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) { |
| 203 | case OT_SYNC_PEER: | 226 | case OT_SYNC_PEER6: |
| 204 | livesync_handle_peersync( &ws ); | 227 | livesync_handle_peersync( &ws, OT_PEER_SIZE6 ); |
| 228 | break; | ||
| 229 | case OT_SYNC_PEER4: | ||
| 230 | livesync_handle_peersync( &ws, OT_PEER_SIZE4 ); | ||
| 205 | break; | 231 | break; |
| 206 | default: | 232 | default: |
| 207 | break; | 233 | break; |
diff --git a/ot_livesync.h b/ot_livesync.h index d7490e5..41bfc2e 100644 --- a/ot_livesync.h +++ b/ot_livesync.h | |||
| @@ -28,13 +28,19 @@ | |||
| 28 | Each tracker instance accumulates announce requests until its buffer is | 28 | Each tracker instance accumulates announce requests until its buffer is |
| 29 | full or a timeout is reached. Then it broadcasts its live sync packer: | 29 | full or a timeout is reached. Then it broadcasts its live sync packer: |
| 30 | 30 | ||
| 31 | packet type SYNC_LIVE | 31 | packet type SYNC_LIVE4 |
| 32 | [ 0x0008 0x14 info_hash | 32 | [ 0x0008 0x14 info_hash |
| 33 | 0x001c 0x04 peer's ipv4 address | 33 | 0x001c 0x04 peer's ipv4 address |
| 34 | 0x0020 0x02 peer's port | 34 | 0x0020 0x02 peer's port |
| 35 | 0x0024 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 ) | 35 | 0x0024 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 ) |
| 36 | ]* | 36 | ]* |
| 37 | 37 | ||
| 38 | packet type SYNC_LIVE6 | ||
| 39 | [ 0x0008 0x14 info_hash | ||
| 40 | 0x001c 0x10 peer's ipv6 address | ||
| 41 | 0x002c 0x02 peer's port | ||
| 42 | 0x002e 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 ) | ||
| 43 | ]* | ||
| 38 | */ | 44 | */ |
| 39 | 45 | ||
| 40 | #ifdef WANT_SYNC_LIVE | 46 | #ifdef WANT_SYNC_LIVE |
