diff options
Diffstat (limited to 'proxy.c')
-rw-r--r-- | proxy.c | 852 |
1 files changed, 450 insertions, 402 deletions
@@ -4,33 +4,33 @@ | |||
4 | $Id$ */ | 4 | $Id$ */ |
5 | 5 | ||
6 | /* System */ | 6 | /* System */ |
7 | #include <arpa/inet.h> | ||
8 | #include <ctype.h> | ||
9 | #include <errno.h> | ||
10 | #include <pthread.h> | ||
11 | #include <pwd.h> | ||
12 | #include <signal.h> | ||
7 | #include <stdint.h> | 13 | #include <stdint.h> |
14 | #include <stdio.h> | ||
8 | #include <stdlib.h> | 15 | #include <stdlib.h> |
9 | #include <string.h> | 16 | #include <string.h> |
10 | #include <arpa/inet.h> | ||
11 | #include <sys/socket.h> | 17 | #include <sys/socket.h> |
12 | #include <unistd.h> | 18 | #include <unistd.h> |
13 | #include <errno.h> | ||
14 | #include <signal.h> | ||
15 | #include <stdio.h> | ||
16 | #include <pwd.h> | ||
17 | #include <ctype.h> | ||
18 | #include <pthread.h> | ||
19 | 19 | ||
20 | /* Libowfat */ | 20 | /* Libowfat */ |
21 | #include "socket.h" | 21 | #include "byte.h" |
22 | #include "io.h" | 22 | #include "io.h" |
23 | #include "iob.h" | 23 | #include "iob.h" |
24 | #include "byte.h" | ||
25 | #include "scan.h" | ||
26 | #include "ip6.h" | 24 | #include "ip6.h" |
27 | #include "ndelay.h" | 25 | #include "ndelay.h" |
26 | #include "scan.h" | ||
27 | #include "socket.h" | ||
28 | 28 | ||
29 | /* Opentracker */ | 29 | /* Opentracker */ |
30 | #include "trackerlogic.h" | ||
31 | #include "ot_vector.h" | ||
32 | #include "ot_mutex.h" | 30 | #include "ot_mutex.h" |
33 | #include "ot_stats.h" | 31 | #include "ot_stats.h" |
32 | #include "ot_vector.h" | ||
33 | #include "trackerlogic.h" | ||
34 | 34 | ||
35 | #ifndef WANT_SYNC_LIVE | 35 | #ifndef WANT_SYNC_LIVE |
36 | #define WANT_SYNC_LIVE | 36 | #define WANT_SYNC_LIVE |
@@ -40,28 +40,28 @@ | |||
40 | ot_ip6 g_serverip; | 40 | ot_ip6 g_serverip; |
41 | uint16_t g_serverport = 9009; | 41 | uint16_t g_serverport = 9009; |
42 | uint32_t g_tracker_id; | 42 | uint32_t g_tracker_id; |
43 | char groupip_1[4] = { 224,0,23,5 }; | 43 | char groupip_1[4] = {224, 0, 23, 5}; |
44 | int g_self_pipe[2]; | 44 | int g_self_pipe[2]; |
45 | 45 | ||
46 | /* If you have more than 10 peers, don't use this proxy | 46 | /* If you have more than 10 peers, don't use this proxy |
47 | Use 20 slots for 10 peers to have room for 10 incoming connection slots | 47 | Use 20 slots for 10 peers to have room for 10 incoming connection slots |
48 | */ | 48 | */ |
49 | #define MAX_PEERS 20 | 49 | #define MAX_PEERS 20 |
50 | 50 | ||
51 | #define LIVESYNC_INCOMING_BUFFSIZE (256*256) | 51 | #define LIVESYNC_INCOMING_BUFFSIZE (256 * 256) |
52 | #define STREAMSYNC_OUTGOING_BUFFSIZE (256*256) | 52 | #define STREAMSYNC_OUTGOING_BUFFSIZE (256 * 256) |
53 | 53 | ||
54 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 | 54 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 |
55 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) | 55 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash)) |
56 | #define LIVESYNC_MAXDELAY 15 /* seconds */ | 56 | #define LIVESYNC_MAXDELAY 15 /* seconds */ |
57 | 57 | ||
58 | /* The amount of time a complete sync cycle should take */ | 58 | /* The amount of time a complete sync cycle should take */ |
59 | #define OT_SYNC_INTERVAL_MINUTES 2 | 59 | #define OT_SYNC_INTERVAL_MINUTES 2 |
60 | 60 | ||
61 | /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ | 61 | /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ |
62 | #define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) ) | 62 | #define OT_SYNC_SLEEP (((OT_SYNC_INTERVAL_MINUTES) * 60 * 1000000) / (OT_BUCKET_COUNT)) |
63 | 63 | ||
64 | enum { OT_SYNC_PEER }; | 64 | enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; |
65 | enum { FLAG_SERVERSOCKET = 1 }; | 65 | enum { FLAG_SERVERSOCKET = 1 }; |
66 | 66 | ||
67 | /* For incoming packets */ | 67 | /* For incoming packets */ |
@@ -75,145 +75,153 @@ static uint8_t *g_peerbuffer_pos; | |||
75 | static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; | 75 | static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; |
76 | static ot_time g_next_packet_time; | 76 | static ot_time g_next_packet_time; |
77 | 77 | ||
78 | static void * livesync_worker( void * args ); | 78 | static void *livesync_worker(void *args); |
79 | static void * streamsync_worker( void * args ); | 79 | static void *streamsync_worker(void *args); |
80 | static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ); | 80 | static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer); |
81 | 81 | ||
82 | void exerr( char * message ) { | 82 | void exerr(char *message) { |
83 | fprintf( stderr, "%s\n", message ); | 83 | fprintf(stderr, "%s\n", message); |
84 | exit( 111 ); | 84 | exit(111); |
85 | } | 85 | } |
86 | 86 | ||
87 | void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) { | 87 | void stats_issue_event(ot_status_event event, PROTO_FLAG proto, uintptr_t event_data) { |
88 | (void) event; | 88 | (void)event; |
89 | (void) proto; | 89 | (void)proto; |
90 | (void) event_data; | 90 | (void)event_data; |
91 | } | 91 | } |
92 | 92 | ||
93 | void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | 93 | void livesync_bind_mcast(ot_ip6 ip, uint16_t port) { |
94 | char tmpip[4] = {0,0,0,0}; | 94 | char tmpip[4] = {0, 0, 0, 0}; |
95 | char *v4ip; | 95 | char *v4ip; |
96 | 96 | ||
97 | if( !ip6_isv4mapped(ip)) | 97 | if (!ip6_isv4mapped(ip)) |
98 | exerr("v6 mcast support not yet available."); | 98 | exerr("v6 mcast support not yet available."); |
99 | v4ip = ip+12; | 99 | v4ip = ip + 12; |
100 | 100 | ||
101 | if( g_socket_in != -1 ) | 101 | if (g_socket_in != -1) |
102 | exerr("Error: Livesync listen ip specified twice."); | 102 | exerr("Error: Livesync listen ip specified twice."); |
103 | 103 | ||
104 | if( ( g_socket_in = socket_udp4( )) < 0) | 104 | if ((g_socket_in = socket_udp4()) < 0) |
105 | exerr("Error: Cant create live sync incoming socket." ); | 105 | exerr("Error: Cant create live sync incoming socket."); |
106 | ndelay_off(g_socket_in); | 106 | ndelay_off(g_socket_in); |
107 | 107 | ||
108 | if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) | 108 | if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1) |
109 | exerr("Error: Cant bind live sync incoming socket." ); | 109 | exerr("Error: Cant bind live sync incoming socket."); |
110 | 110 | ||
111 | if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) ) | 111 | if (socket_mcjoin4(g_socket_in, groupip_1, v4ip)) |
112 | exerr("Error: Cant make live sync incoming socket join mcast group."); | 112 | exerr("Error: Cant make live sync incoming socket join mcast group."); |
113 | 113 | ||
114 | if( ( g_socket_out = socket_udp4()) < 0) | 114 | if ((g_socket_out = socket_udp4()) < 0) |
115 | exerr("Error: Cant create live sync outgoing socket." ); | 115 | exerr("Error: Cant create live sync outgoing socket."); |
116 | if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 ) | 116 | if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1) |
117 | exerr("Error: Cant bind live sync outgoing socket." ); | 117 | exerr("Error: Cant bind live sync outgoing socket."); |
118 | 118 | ||
119 | socket_mcttl4(g_socket_out, 1); | 119 | socket_mcttl4(g_socket_out, 1); |
120 | socket_mcloop4(g_socket_out, 1); | 120 | socket_mcloop4(g_socket_out, 1); |
121 | } | 121 | } |
122 | 122 | ||
123 | size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { | 123 | size_t add_peer_to_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) { |
124 | int exactmatch; | 124 | int exactmatch; |
125 | ot_torrent *torrent; | 125 | ot_torrent *torrent; |
126 | ot_peer *peer_dest; | 126 | ot_peerlist *peer_list; |
127 | ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); | 127 | ot_peer *peer_dest; |
128 | 128 | ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); | |
129 | torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); | 129 | size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size); |
130 | if( !torrent ) | 130 | |
131 | torrent = vector_find_or_insert(torrents_list, (void *)hash, sizeof(ot_torrent), compare_size, &exactmatch); | ||
132 | if (!torrent) | ||
131 | return -1; | 133 | return -1; |
132 | 134 | ||
133 | if( !exactmatch ) { | 135 | if (!exactmatch) { |
134 | /* Create a new torrent entry, then */ | 136 | /* Create a new torrent entry, then */ |
135 | memcpy( torrent->hash, hash, sizeof(ot_hash) ); | 137 | memcpy(torrent->hash, hash, sizeof(ot_hash)); |
136 | 138 | ||
137 | if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { | 139 | if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) { |
138 | vector_remove_torrent( torrents_list, torrent ); | 140 | vector_remove_torrent(torrents_list, torrent); |
139 | mutex_bucket_unlock_by_hash( hash, 0 ); | 141 | mutex_bucket_unlock_by_hash(hash, 0); |
140 | return -1; | 142 | return -1; |
141 | } | 143 | } |
142 | 144 | ||
143 | byte_zero( torrent->peer_list, sizeof( ot_peerlist ) ); | 145 | byte_zero(torrent->peer_list6, sizeof(ot_peerlist)); |
146 | byte_zero(torrent->peer_list4, sizeof(ot_peerlist)); | ||
144 | } | 147 | } |
145 | 148 | ||
149 | peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; | ||
150 | |||
146 | /* Check for peer in torrent */ | 151 | /* Check for peer in torrent */ |
147 | peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); | 152 | peer_dest = vector_find_or_insert_peer(&(peer_list->peers), peer, peer_size, &exactmatch); |
148 | if( !peer_dest ) { | 153 | if (!peer_dest) { |
149 | mutex_bucket_unlock_by_hash( hash, 0 ); | 154 | mutex_bucket_unlock_by_hash(hash, 0); |
150 | return -1; | 155 | return -1; |
151 | } | 156 | } |
152 | /* Tell peer that it's fresh */ | 157 | /* Tell peer that it's fresh */ |
153 | OT_PEERTIME( peer ) = 0; | 158 | OT_PEERTIME(peer, peer_size) = 0; |
154 | 159 | ||
155 | /* If we hadn't had a match create peer there */ | 160 | /* If we hadn't had a match create peer there */ |
156 | if( !exactmatch ) { | 161 | if (!exactmatch) { |
157 | torrent->peer_list->peer_count++; | 162 | peer_list->peer_count++; |
158 | if( OT_PEERFLAG(peer) & PEER_FLAG_SEEDING ) | 163 | if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING) |
159 | torrent->peer_list->seed_count++; | 164 | peer_list->seed_count++; |
160 | } | 165 | } |
161 | memcpy( peer_dest, peer, sizeof(ot_peer) ); | 166 | memcpy(peer_dest, peer, peer_size); |
162 | mutex_bucket_unlock_by_hash( hash, 0 ); | 167 | mutex_bucket_unlock_by_hash(hash, 0); |
163 | return 0; | 168 | return 0; |
164 | } | 169 | } |
165 | 170 | ||
166 | size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer ) { | 171 | size_t remove_peer_from_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) { |
167 | int exactmatch; | 172 | int exactmatch; |
168 | ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); | 173 | ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); |
169 | ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); | 174 | ot_torrent *torrent = binary_search(hash, torrents_list->data, torrents_list->size, sizeof(ot_torrent), OT_HASH_COMPARE_SIZE, &exactmatch); |
170 | 175 | ||
171 | if( exactmatch ) { | 176 | if (exactmatch) { |
172 | ot_peerlist *peer_list = torrent->peer_list; | 177 | ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; |
173 | switch( vector_remove_peer( &peer_list->peers, peer ) ) { | 178 | switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) { |
174 | case 2: peer_list->seed_count--; /* Fall throughs intended */ | 179 | case 2: |
175 | case 1: peer_list->peer_count--; /* Fall throughs intended */ | 180 | peer_list->seed_count--; /* Intentional fallthrough */ |
176 | default: break; | 181 | case 1: |
182 | peer_list->peer_count--; /* Intentional fallthrough */ | ||
183 | default: | ||
184 | break; | ||
177 | } | 185 | } |
178 | } | 186 | } |
179 | 187 | ||
180 | mutex_bucket_unlock_by_hash( hash, 0 ); | 188 | mutex_bucket_unlock_by_hash(hash, 0); |
181 | return 0; | 189 | return 0; |
182 | } | 190 | } |
183 | 191 | ||
184 | void free_peerlist( ot_peerlist *peer_list ) { | 192 | void free_peerlist(ot_peerlist *peer_list) { |
185 | if( peer_list->peers.data ) { | 193 | if (peer_list->peers.data) { |
186 | if( OT_PEERLIST_HASBUCKETS( peer_list ) ) { | 194 | if (OT_PEERLIST_HASBUCKETS(peer_list)) { |
187 | ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data); | 195 | ot_vector *bucket_list = (ot_vector *)(peer_list->peers.data); |
188 | 196 | ||
189 | while( peer_list->peers.size-- ) | 197 | while (peer_list->peers.size--) |
190 | free( bucket_list++->data ); | 198 | free(bucket_list++->data); |
191 | } | 199 | } |
192 | free( peer_list->peers.data ); | 200 | free(peer_list->peers.data); |
193 | } | 201 | } |
194 | free( peer_list ); | 202 | free(peer_list); |
195 | } | 203 | } |
196 | 204 | ||
197 | static void livesync_handle_peersync( ssize_t datalen ) { | 205 | static void livesync_handle_peersync(ssize_t datalen, size_t peer_size) { |
198 | int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 206 | int off = sizeof(g_tracker_id) + sizeof(uint32_t); |
199 | 207 | ||
200 | fprintf( stderr, "." ); | 208 | fprintf(stderr, "."); |
201 | 209 | ||
202 | while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { | 210 | while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= datalen) { |
203 | ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); | 211 | ot_peer *peer = (ot_peer *)(g_inbuffer + off + sizeof(ot_hash)); |
204 | ot_hash *hash = (ot_hash*)(g_inbuffer + off); | 212 | ot_hash *hash = (ot_hash *)(g_inbuffer + off); |
205 | 213 | ||
206 | if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) | 214 | if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED) |
207 | remove_peer_from_torrent_proxy( *hash, peer ); | 215 | remove_peer_from_torrent_proxy(*hash, peer, peer_size); |
208 | else | 216 | else |
209 | add_peer_to_torrent_proxy( *hash, peer ); | 217 | add_peer_to_torrent_proxy(*hash, peer, peer_size); |
210 | 218 | ||
211 | off += sizeof( ot_hash ) + sizeof( ot_peer ); | 219 | off += sizeof(ot_hash) + peer_size; |
212 | } | 220 | } |
213 | } | 221 | } |
214 | 222 | ||
215 | int usage( char *self ) { | 223 | int usage(char *self) { |
216 | fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self ); | 224 | fprintf(stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self); |
217 | return 0; | 225 | return 0; |
218 | } | 226 | } |
219 | 227 | ||
@@ -228,115 +236,115 @@ enum { | |||
228 | FLAG_MASK = 0x07 | 236 | FLAG_MASK = 0x07 |
229 | }; | 237 | }; |
230 | 238 | ||
231 | #define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING) | 239 | #define PROXYPEER_NEEDSCONNECT(flag) ((flag) == FLAG_OUTGOING) |
232 | #define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED) | 240 | #define PROXYPEER_ISCONNECTED(flag) (((flag) & FLAG_MASK) == FLAG_CONNECTED) |
233 | #define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED) | 241 | #define PROXYPEER_SETDISCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_DISCONNECTED) |
234 | #define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING) | 242 | #define PROXYPEER_SETCONNECTING(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTING) |
235 | #define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID) | 243 | #define PROXYPEER_SETWAITTRACKERID(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_WAITTRACKERID) |
236 | #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) | 244 | #define PROXYPEER_SETCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTED) |
237 | 245 | ||
238 | typedef struct { | 246 | typedef struct { |
239 | int state; /* Whether we want to connect, how far our handshake is, etc. */ | 247 | int state; /* Whether we want to connect, how far our handshake is, etc. */ |
240 | ot_ip6 ip; /* The peer to connect to */ | 248 | ot_ip6 ip; /* The peer to connect to */ |
241 | uint16_t port; /* The peers port */ | 249 | uint16_t port; /* The peers port */ |
242 | uint8_t indata[8192*16]; /* Any data not processed yet */ | 250 | uint8_t indata[8192 * 16]; /* Any data not processed yet */ |
243 | size_t indata_length; /* Length of unprocessed data */ | 251 | size_t indata_length; /* Length of unprocessed data */ |
244 | uint32_t tracker_id; /* How the other end greeted */ | 252 | uint32_t tracker_id; /* How the other end greeted */ |
245 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ | 253 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ |
246 | io_batch outdata; /* The iobatch containing our sync data */ | 254 | io_batch outdata; /* The iobatch containing our sync data */ |
247 | 255 | ||
248 | size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ | 256 | size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ |
249 | uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ | 257 | uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ |
250 | uint8_t packet_type; /* Type of current packet */ | 258 | uint8_t packet_type; /* Type of current packet */ |
251 | uint32_t packet_tid; /* Tracker id for current packet */ | 259 | uint32_t packet_tid; /* Tracker id for current packet */ |
252 | 260 | ||
253 | } proxy_peer; | 261 | } proxy_peer; |
254 | static void process_indata( proxy_peer * peer ); | 262 | static void process_indata(proxy_peer *peer); |
255 | 263 | ||
256 | void reset_info_block( proxy_peer * peer ) { | 264 | void reset_info_block(proxy_peer *peer) { |
257 | peer->indata_length = 0; | 265 | peer->indata_length = 0; |
258 | peer->tracker_id = 0; | 266 | peer->tracker_id = 0; |
259 | peer->fd = -1; | 267 | peer->fd = -1; |
260 | peer->packet_tcount = 0; | 268 | peer->packet_tcount = 0; |
261 | iob_reset( &peer->outdata ); | 269 | iob_reset(&peer->outdata); |
262 | PROXYPEER_SETDISCONNECTED( peer->state ); | 270 | PROXYPEER_SETDISCONNECTED(peer->state); |
263 | } | 271 | } |
264 | 272 | ||
265 | /* Number of connections to peers | 273 | /* Number of connections to peers |
266 | * If a peer's IP is set, we try to reconnect, when the connection drops | 274 | * If a peer's IP is set, we try to reconnect, when the connection drops |
267 | * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it | 275 | * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it |
268 | * Multiple connections to/from the same ip are okay, if tracker_id doesn't match | 276 | * Multiple connections to/from the same ip are okay, if tracker_id doesn't match |
269 | * Reconnect attempts occur only twice a minute | 277 | * Reconnect attempts occur only twice a minute |
270 | */ | 278 | */ |
271 | static int g_connection_count; | 279 | static int g_connection_count; |
272 | static ot_time g_connection_reconn; | 280 | static ot_time g_connection_reconn; |
273 | static proxy_peer g_connections[MAX_PEERS]; | 281 | static proxy_peer g_connections[MAX_PEERS]; |
274 | 282 | ||
275 | static void handle_reconnects( void ) { | 283 | static void handle_reconnects(void) { |
276 | int i; | 284 | int i; |
277 | for( i=0; i<g_connection_count; ++i ) | 285 | for (i = 0; i < g_connection_count; ++i) |
278 | if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { | 286 | if (PROXYPEER_NEEDSCONNECT(g_connections[i].state)) { |
279 | int64 newfd = socket_tcp6( ); | 287 | int64 newfd = socket_tcp6(); |
280 | fprintf( stderr, "(Re)connecting to peer..." ); | 288 | fprintf(stderr, "(Re)connecting to peer..."); |
281 | if( newfd < 0 ) continue; /* No socket for you */ | 289 | if (newfd < 0) |
290 | continue; /* No socket for you */ | ||
282 | io_fd(newfd); | 291 | io_fd(newfd); |
283 | if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { | 292 | if (socket_bind6_reuse(newfd, g_serverip, g_serverport, 0)) { |
284 | io_close( newfd ); | 293 | io_close(newfd); |
285 | continue; | 294 | continue; |
286 | } | 295 | } |
287 | if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 && | 296 | if (socket_connect6(newfd, g_connections[i].ip, g_connections[i].port, 0) == -1 && errno != EINPROGRESS && errno != EWOULDBLOCK) { |
288 | errno != EINPROGRESS && errno != EWOULDBLOCK ) { | ||
289 | close(newfd); | 297 | close(newfd); |
290 | continue; | 298 | continue; |
291 | } | 299 | } |
292 | io_wantwrite(newfd); /* So we will be informed when it is connected */ | 300 | io_wantwrite(newfd); /* So we will be informed when it is connected */ |
293 | io_setcookie(newfd,g_connections+i); | 301 | io_setcookie(newfd, g_connections + i); |
294 | 302 | ||
295 | /* Prepare connection info block */ | 303 | /* Prepare connection info block */ |
296 | reset_info_block( g_connections+i ); | 304 | reset_info_block(g_connections + i); |
297 | g_connections[i].fd = newfd; | 305 | g_connections[i].fd = newfd; |
298 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | 306 | PROXYPEER_SETCONNECTING(g_connections[i].state); |
299 | } | 307 | } |
300 | g_connection_reconn = time(NULL) + 30; | 308 | g_connection_reconn = time(NULL) + 30; |
301 | } | 309 | } |
302 | 310 | ||
303 | /* Handle incoming connection requests, check against whitelist */ | 311 | /* Handle incoming connection requests, check against whitelist */ |
304 | static void handle_accept( int64 serversocket ) { | 312 | static void handle_accept(int64 serversocket) { |
305 | int64 newfd; | 313 | int64 newfd; |
306 | ot_ip6 ip; | 314 | ot_ip6 ip; |
307 | uint16 port; | 315 | uint16 port; |
308 | 316 | ||
309 | while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) { | 317 | while ((newfd = socket_accept6(serversocket, ip, &port, NULL)) != -1) { |
310 | 318 | ||
311 | /* XXX some access control */ | 319 | /* XXX some access control */ |
312 | 320 | ||
313 | /* Put fd into a non-blocking mode */ | 321 | /* Put fd into a non-blocking mode */ |
314 | io_nonblock( newfd ); | 322 | io_nonblock(newfd); |
315 | 323 | ||
316 | if( !io_fd( newfd ) ) | 324 | if (!io_fd(newfd)) |
317 | io_close( newfd ); | 325 | io_close(newfd); |
318 | else { | 326 | else { |
319 | /* Find a new home for our incoming connection */ | 327 | /* Find a new home for our incoming connection */ |
320 | int i; | 328 | int i; |
321 | for( i=0; i<MAX_PEERS; ++i ) | 329 | for (i = 0; i < MAX_PEERS; ++i) |
322 | if( g_connections[i].state == FLAG_DISCONNECTED ) | 330 | if (g_connections[i].state == FLAG_DISCONNECTED) |
323 | break; | 331 | break; |
324 | if( i == MAX_PEERS ) { | 332 | if (i == MAX_PEERS) { |
325 | fprintf( stderr, "No room for incoming connection." ); | 333 | fprintf(stderr, "No room for incoming connection."); |
326 | close( newfd ); | 334 | close(newfd); |
327 | continue; | 335 | continue; |
328 | } | 336 | } |
329 | 337 | ||
330 | /* Prepare connection info block */ | 338 | /* Prepare connection info block */ |
331 | reset_info_block( g_connections+i ); | 339 | reset_info_block(g_connections + i); |
332 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | 340 | PROXYPEER_SETCONNECTING(g_connections[i].state); |
333 | g_connections[i].port = port; | 341 | g_connections[i].port = port; |
334 | g_connections[i].fd = newfd; | 342 | g_connections[i].fd = newfd; |
335 | 343 | ||
336 | io_setcookie( newfd, g_connections + i ); | 344 | io_setcookie(newfd, g_connections + i); |
337 | 345 | ||
338 | /* We expect the connecting side to begin with its tracker_id */ | 346 | /* We expect the connecting side to begin with its tracker_id */ |
339 | io_wantread( newfd ); | 347 | io_wantread(newfd); |
340 | } | 348 | } |
341 | } | 349 | } |
342 | 350 | ||
@@ -344,117 +352,116 @@ static void handle_accept( int64 serversocket ) { | |||
344 | } | 352 | } |
345 | 353 | ||
346 | /* New sync data on the stream */ | 354 | /* New sync data on the stream */ |
347 | static void handle_read( int64 peersocket ) { | 355 | static void handle_read(int64 peersocket) { |
348 | int i; | 356 | int i; |
349 | int64 datalen; | 357 | int64 datalen; |
350 | uint32_t tracker_id; | 358 | uint32_t tracker_id; |
351 | proxy_peer *peer = io_getcookie( peersocket ); | 359 | proxy_peer *peer = io_getcookie(peersocket); |
352 | 360 | ||
353 | if( !peer ) { | 361 | if (!peer) { |
354 | /* Can't happen ;) */ | 362 | /* Can't happen ;) */ |
355 | io_close( peersocket ); | 363 | io_close(peersocket); |
356 | return; | 364 | return; |
357 | } | 365 | } |
358 | switch( peer->state & FLAG_MASK ) { | 366 | switch (peer->state & FLAG_MASK) { |
359 | case FLAG_DISCONNECTED: | 367 | case FLAG_DISCONNECTED: |
360 | io_close( peersocket ); | 368 | io_close(peersocket); |
361 | break; /* Shouldnt happen */ | 369 | break; /* Shouldnt happen */ |
362 | case FLAG_CONNECTING: | 370 | case FLAG_CONNECTING: |
363 | case FLAG_WAITTRACKERID: | 371 | case FLAG_WAITTRACKERID: |
364 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) | 372 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) |
365 | This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ | 373 | This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ |
366 | if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) | 374 | if (io_tryread(peersocket, (void *)&tracker_id, sizeof(tracker_id)) != sizeof(tracker_id)) |
367 | goto close_socket; | 375 | goto close_socket; |
368 | 376 | ||
369 | /* See, if we already have a connection to that peer */ | 377 | /* See, if we already have a connection to that peer */ |
370 | for( i=0; i<MAX_PEERS; ++i ) | 378 | for (i = 0; i < MAX_PEERS; ++i) |
371 | if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED && | 379 | if ((g_connections[i].state & FLAG_MASK) == FLAG_CONNECTED && g_connections[i].tracker_id == tracker_id) { |
372 | g_connections[i].tracker_id == tracker_id ) { | 380 | fprintf(stderr, "Peer already connected. Closing connection.\n"); |
373 | fprintf( stderr, "Peer already connected. Closing connection.\n" ); | ||
374 | goto close_socket; | 381 | goto close_socket; |
375 | } | 382 | } |
376 | 383 | ||
377 | /* Also no need for soliloquy */ | 384 | /* Also no need for soliloquy */ |
378 | if( tracker_id == g_tracker_id ) | 385 | if (tracker_id == g_tracker_id) |
379 | goto close_socket; | 386 | goto close_socket; |
380 | 387 | ||
381 | /* The new connection is good, send our tracker_id on incoming connections */ | 388 | /* The new connection is good, send our tracker_id on incoming connections */ |
382 | if( peer->state == FLAG_CONNECTING ) | 389 | if (peer->state == FLAG_CONNECTING) |
383 | if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) ) | 390 | if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) != sizeof(g_tracker_id)) |
384 | goto close_socket; | 391 | goto close_socket; |
385 | 392 | ||
386 | peer->tracker_id = tracker_id; | 393 | peer->tracker_id = tracker_id; |
387 | PROXYPEER_SETCONNECTED( peer->state ); | 394 | PROXYPEER_SETCONNECTED(peer->state); |
388 | 395 | ||
389 | if( peer->state & FLAG_OUTGOING ) | 396 | if (peer->state & FLAG_OUTGOING) |
390 | fprintf( stderr, "succeeded.\n" ); | 397 | fprintf(stderr, "succeeded.\n"); |
391 | else | 398 | else |
392 | fprintf( stderr, "Incoming connection successful.\n" ); | 399 | fprintf(stderr, "Incoming connection successful.\n"); |
393 | 400 | ||
394 | break; | 401 | break; |
395 | close_socket: | 402 | close_socket: |
396 | fprintf( stderr, "Handshake incomplete, closing socket\n" ); | 403 | fprintf(stderr, "Handshake incomplete, closing socket\n"); |
397 | io_close( peersocket ); | 404 | io_close(peersocket); |
398 | reset_info_block( peer ); | 405 | reset_info_block(peer); |
399 | break; | 406 | break; |
400 | case FLAG_CONNECTED: | 407 | case FLAG_CONNECTED: |
401 | /* Here we acutally expect data from peer | 408 | /* Here we acutally expect data from peer |
402 | indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ | 409 | indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ |
403 | datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); | 410 | datalen = io_tryread(peersocket, (void *)(peer->indata + peer->indata_length), sizeof(peer->indata) - peer->indata_length); |
404 | if( !datalen || datalen < -1 ) { | 411 | if (!datalen || datalen < -1) { |
405 | fprintf( stderr, "Connection closed by remote peer.\n" ); | 412 | fprintf(stderr, "Connection closed by remote peer.\n"); |
406 | io_close( peersocket ); | 413 | io_close(peersocket); |
407 | reset_info_block( peer ); | 414 | reset_info_block(peer); |
408 | } else if( datalen > 0 ) { | 415 | } else if (datalen > 0) { |
409 | peer->indata_length += datalen; | 416 | peer->indata_length += datalen; |
410 | process_indata( peer ); | 417 | process_indata(peer); |
411 | } | 418 | } |
412 | break; | 419 | break; |
413 | } | 420 | } |
414 | } | 421 | } |
415 | 422 | ||
416 | /* Can write new sync data to the stream */ | 423 | /* Can write new sync data to the stream */ |
417 | static void handle_write( int64 peersocket ) { | 424 | static void handle_write(int64 peersocket) { |
418 | proxy_peer *peer = io_getcookie( peersocket ); | 425 | proxy_peer *peer = io_getcookie(peersocket); |
419 | 426 | ||
420 | if( !peer ) { | 427 | if (!peer) { |
421 | /* Can't happen ;) */ | 428 | /* Can't happen ;) */ |
422 | io_close( peersocket ); | 429 | io_close(peersocket); |
423 | return; | 430 | return; |
424 | } | 431 | } |
425 | 432 | ||
426 | switch( peer->state & FLAG_MASK ) { | 433 | switch (peer->state & FLAG_MASK) { |
427 | case FLAG_DISCONNECTED: | 434 | case FLAG_DISCONNECTED: |
428 | default: /* Should not happen */ | 435 | default: /* Should not happen */ |
429 | io_close( peersocket ); | 436 | io_close(peersocket); |
430 | break; | 437 | break; |
431 | case FLAG_CONNECTING: | 438 | case FLAG_CONNECTING: |
432 | /* Ensure that the connection is established and handle connection error */ | 439 | /* Ensure that the connection is established and handle connection error */ |
433 | if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { | 440 | if (peer->state & FLAG_OUTGOING && !socket_connected(peersocket)) { |
434 | fprintf( stderr, "failed\n" ); | 441 | fprintf(stderr, "failed\n"); |
435 | reset_info_block( peer ); | 442 | reset_info_block(peer); |
436 | io_close( peersocket ); | 443 | io_close(peersocket); |
437 | break; | 444 | break; |
438 | } | 445 | } |
439 | 446 | ||
440 | if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) { | 447 | if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) == sizeof(g_tracker_id)) { |
441 | PROXYPEER_SETWAITTRACKERID( peer->state ); | 448 | PROXYPEER_SETWAITTRACKERID(peer->state); |
442 | io_dontwantwrite( peersocket ); | 449 | io_dontwantwrite(peersocket); |
443 | io_wantread( peersocket ); | 450 | io_wantread(peersocket); |
444 | } else { | 451 | } else { |
445 | fprintf( stderr, "Handshake incomplete, closing socket\n" ); | 452 | fprintf(stderr, "Handshake incomplete, closing socket\n"); |
446 | io_close( peersocket ); | 453 | io_close(peersocket); |
447 | reset_info_block( peer ); | 454 | reset_info_block(peer); |
448 | } | 455 | } |
449 | break; | 456 | break; |
450 | case FLAG_CONNECTED: | 457 | case FLAG_CONNECTED: |
451 | switch( iob_send( peersocket, &peer->outdata ) ) { | 458 | switch (iob_send(peersocket, &peer->outdata)) { |
452 | case 0: /* all data sent */ | 459 | case 0: /* all data sent */ |
453 | io_dontwantwrite( peersocket ); | 460 | io_dontwantwrite(peersocket); |
454 | break; | 461 | break; |
455 | case -3: /* an error occured */ | 462 | case -3: /* an error occured */ |
456 | io_close( peersocket ); | 463 | io_close(peersocket); |
457 | reset_info_block( peer ); | 464 | reset_info_block(peer); |
458 | break; | 465 | break; |
459 | default: /* Normal operation or eagain */ | 466 | default: /* Normal operation or eagain */ |
460 | break; | 467 | break; |
@@ -469,290 +476,324 @@ static void server_mainloop() { | |||
469 | int64 sock; | 476 | int64 sock; |
470 | 477 | ||
471 | /* inlined livesync_init() */ | 478 | /* inlined livesync_init() */ |
472 | memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); | 479 | memset(g_peerbuffer_start, 0, sizeof(g_peerbuffer_start)); |
473 | g_peerbuffer_pos = g_peerbuffer_start; | 480 | g_peerbuffer_pos = g_peerbuffer_start; |
474 | memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); | 481 | memcpy(g_peerbuffer_pos, &g_tracker_id, sizeof(g_tracker_id)); |
475 | uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); | 482 | uint32_pack_big((char *)g_peerbuffer_pos + sizeof(g_tracker_id), OT_SYNC_PEER); |
476 | g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); | 483 | g_peerbuffer_pos += sizeof(g_tracker_id) + sizeof(uint32_t); |
477 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; | 484 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; |
478 | 485 | ||
479 | while(1) { | 486 | while (1) { |
480 | /* See, if we need to connect to anyone */ | 487 | /* See if we need to connect to anyone */ |
481 | if( time(NULL) > g_connection_reconn ) | 488 | if (time(NULL) > g_connection_reconn) |
482 | handle_reconnects( ); | 489 | handle_reconnects(); |
483 | 490 | ||
484 | /* Wait for io events until next approx reconn check time */ | 491 | /* Wait for io events until next approx reconn check time */ |
485 | io_waituntil2( 30*1000 ); | 492 | io_waituntil2(30 * 1000); |
486 | 493 | ||
487 | /* Loop over readable sockets */ | 494 | /* Loop over readable sockets */ |
488 | while( ( sock = io_canread( ) ) != -1 ) { | 495 | while ((sock = io_canread()) != -1) { |
489 | const void *cookie = io_getcookie( sock ); | 496 | const void *cookie = io_getcookie(sock); |
490 | if( (uintptr_t)cookie == FLAG_SERVERSOCKET ) | 497 | if ((uintptr_t)cookie == FLAG_SERVERSOCKET) |
491 | handle_accept( sock ); | 498 | handle_accept(sock); |
492 | else | 499 | else |
493 | handle_read( sock ); | 500 | handle_read(sock); |
494 | } | 501 | } |
495 | 502 | ||
496 | /* Loop over writable sockets */ | 503 | /* Loop over writable sockets */ |
497 | while( ( sock = io_canwrite( ) ) != -1 ) | 504 | while ((sock = io_canwrite()) != -1) |
498 | handle_write( sock ); | 505 | handle_write(sock); |
499 | 506 | ||
500 | livesync_ticker( ); | 507 | livesync_ticker(); |
501 | } | 508 | } |
502 | } | 509 | } |
503 | 510 | ||
504 | static void panic( const char *routine ) { | 511 | static void panic(const char *routine) { |
505 | fprintf( stderr, "%s: %s\n", routine, strerror(errno) ); | 512 | fprintf(stderr, "%s: %s\n", routine, strerror(errno)); |
506 | exit( 111 ); | 513 | exit(111); |
507 | } | 514 | } |
508 | 515 | ||
509 | static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) { | 516 | static int64_t ot_try_bind(ot_ip6 ip, uint16_t port) { |
510 | int64 sock = socket_tcp6( ); | 517 | int64 sock = socket_tcp6(); |
511 | 518 | ||
512 | if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 ) | 519 | if (socket_bind6_reuse(sock, ip, port, 0) == -1) |
513 | panic( "socket_bind6_reuse" ); | 520 | panic("socket_bind6_reuse"); |
514 | 521 | ||
515 | if( socket_listen( sock, SOMAXCONN) == -1 ) | 522 | if (socket_listen(sock, SOMAXCONN) == -1) |
516 | panic( "socket_listen" ); | 523 | panic("socket_listen"); |
517 | 524 | ||
518 | if( !io_fd( sock ) ) | 525 | if (!io_fd(sock)) |
519 | panic( "io_fd" ); | 526 | panic("io_fd"); |
520 | 527 | ||
521 | io_setcookie( sock, (void*)FLAG_SERVERSOCKET ); | 528 | io_setcookie(sock, (void *)FLAG_SERVERSOCKET); |
522 | io_wantread( sock ); | 529 | io_wantread(sock); |
523 | return sock; | 530 | return sock; |
524 | } | 531 | } |
525 | 532 | ||
526 | 533 | static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) { | |
527 | static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) { | ||
528 | const char *s = src; | 534 | const char *s = src; |
529 | int off, bracket = 0; | 535 | int off, bracket = 0; |
530 | while( isspace(*s) ) ++s; | 536 | while (isspace(*s)) |
531 | if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */ | 537 | ++s; |
532 | if( !(off = scan_ip6( s, ip ) ) ) | 538 | if (*s == '[') |
539 | ++s, ++bracket; /* for v6 style notation */ | ||
540 | if (!(off = scan_ip6(s, ip))) | ||
533 | return 0; | 541 | return 0; |
534 | s += off; | 542 | s += off; |
535 | if( *s == 0 || isspace(*s)) return s-src; | 543 | if (*s == 0 || isspace(*s)) |
536 | if( *s == ']' && bracket ) ++s; | 544 | return s - src; |
537 | if( !ip6_isv4mapped(ip)){ | 545 | if (*s == ']' && bracket) |
538 | if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0; | 546 | ++s; |
547 | if (!ip6_isv4mapped(ip)) { | ||
548 | if ((bracket && *(s) != ':') || (*(s) != '.')) | ||
549 | return 0; | ||
539 | s++; | 550 | s++; |
540 | } else { | 551 | } else { |
541 | if( *(s++) != ':' ) return 0; | 552 | if (*(s++) != ':') |
553 | return 0; | ||
542 | } | 554 | } |
543 | if( !(off = scan_ushort (s, port ) ) ) | 555 | if (!(off = scan_ushort(s, port))) |
544 | return 0; | 556 | return 0; |
545 | return off+s-src; | 557 | return off + s - src; |
546 | } | 558 | } |
547 | 559 | ||
548 | int main( int argc, char **argv ) { | 560 | int main(int argc, char **argv) { |
549 | static pthread_t sync_in_thread_id; | 561 | static pthread_t sync_in_thread_id; |
550 | static pthread_t sync_out_thread_id; | 562 | static pthread_t sync_out_thread_id; |
551 | ot_ip6 serverip; | 563 | ot_ip6 serverip; |
552 | uint16_t tmpport; | 564 | uint16_t tmpport; |
553 | int scanon = 1, lbound = 0, sbound = 0; | 565 | int scanon = 1, lbound = 0, sbound = 0; |
554 | 566 | ||
555 | srandom( time(NULL) ); | 567 | srandom(time(NULL)); |
556 | #ifdef WANT_ARC4RANDOM | 568 | #ifdef WANT_ARC4RANDOM |
557 | g_tracker_id = arc4random(); | 569 | g_tracker_id = arc4random(); |
558 | #else | 570 | #else |
559 | g_tracker_id = random(); | 571 | g_tracker_id = random(); |
560 | #endif | 572 | #endif |
561 | noipv6=1; | ||
562 | 573 | ||
563 | while( scanon ) { | 574 | while (scanon) { |
564 | switch( getopt( argc, argv, ":l:c:L:h" ) ) { | 575 | switch (getopt(argc, argv, ":l:c:L:h")) { |
565 | case -1: scanon = 0; break; | 576 | case -1: |
577 | scanon = 0; | ||
578 | break; | ||
566 | case 'l': | 579 | case 'l': |
567 | tmpport = 0; | 580 | tmpport = 0; |
568 | if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } | 581 | if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) { |
569 | ot_try_bind( serverip, tmpport ); | 582 | usage(argv[0]); |
583 | exit(1); | ||
584 | } | ||
585 | ot_try_bind(serverip, tmpport); | ||
570 | ++sbound; | 586 | ++sbound; |
571 | break; | 587 | break; |
572 | case 'c': | 588 | case 'c': |
573 | if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" ); | 589 | if (g_connection_count > MAX_PEERS / 2) |
590 | exerr("Connection limit exceeded.\n"); | ||
574 | tmpport = 0; | 591 | tmpport = 0; |
575 | if( !scan_ip6_port( optarg, | 592 | if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) { |
576 | g_connections[g_connection_count].ip, | 593 | usage(argv[0]); |
577 | &g_connections[g_connection_count].port ) || | 594 | exit(1); |
578 | !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); } | 595 | } |
579 | g_connections[g_connection_count++].state = FLAG_OUTGOING; | 596 | g_connections[g_connection_count++].state = FLAG_OUTGOING; |
580 | break; | 597 | break; |
581 | case 'L': | 598 | case 'L': |
582 | tmpport = 9696; | 599 | tmpport = 9696; |
583 | if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } | 600 | if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) { |
584 | livesync_bind_mcast( serverip, tmpport); ++lbound; break; | 601 | usage(argv[0]); |
602 | exit(1); | ||
603 | } | ||
604 | livesync_bind_mcast(serverip, tmpport); | ||
605 | ++lbound; | ||
606 | break; | ||
585 | default: | 607 | default: |
586 | case '?': usage( argv[0] ); exit( 1 ); | 608 | case '?': |
609 | usage(argv[0]); | ||
610 | exit(1); | ||
587 | } | 611 | } |
588 | } | 612 | } |
589 | 613 | ||
590 | if( !lbound ) exerr( "No livesync port bound." ); | 614 | if (!lbound) |
591 | if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." ); | 615 | exerr("No livesync port bound."); |
592 | pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); | 616 | if (!g_connection_count && !sbound) |
593 | pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); | 617 | exerr("No streamsync port bound."); |
618 | pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL); | ||
619 | pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL); | ||
594 | 620 | ||
595 | server_mainloop(); | 621 | server_mainloop(); |
596 | return 0; | 622 | return 0; |
597 | } | 623 | } |
598 | 624 | ||
599 | static void * streamsync_worker( void * args ) { | 625 | static void *streamsync_worker(void *args) { |
600 | (void)args; | 626 | (void)args; |
601 | while( 1 ) { | 627 | while (1) { |
602 | int bucket; | 628 | int bucket; |
603 | /* For each bucket... */ | 629 | /* For each bucket... */ |
604 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | 630 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { |
605 | /* Get exclusive access to that bucket */ | 631 | /* Get exclusive access to that bucket */ |
606 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | 632 | ot_vector *torrents_list = mutex_bucket_lock(bucket); |
607 | size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; | 633 | size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; |
608 | size_t mem, mem_a = 0, mem_b = 0; | 634 | size_t mem, mem_a = 0, mem_b = 0; |
609 | uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c; | 635 | uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c; |
610 | 636 | ||
611 | if( !torrents_list->size ) goto unlock_continue; | 637 | if (!torrents_list->size) |
638 | goto unlock_continue; | ||
612 | 639 | ||
613 | /* For each torrent in this bucket.. */ | 640 | /* For each torrent in this bucket.. */ |
614 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | 641 | for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) { |
615 | /* Address torrents members */ | 642 | /* Address torrents members */ |
616 | ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; | 643 | ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list; |
617 | switch( peer_list->peer_count ) { | 644 | switch (peer_list->peer_count) { |
618 | case 2: count_two++; break; | 645 | case 2: |
619 | case 1: count_one++; break; | 646 | count_two++; |
620 | case 0: break; | 647 | break; |
621 | default: count_def++; | 648 | case 1: |
622 | count_peers += peer_list->peer_count; | 649 | count_one++; |
650 | break; | ||
651 | case 0: | ||
652 | break; | ||
653 | default: | ||
654 | count_def++; | ||
655 | count_peers += peer_list->peer_count; | ||
623 | } | 656 | } |
624 | } | 657 | } |
625 | 658 | ||
626 | /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ | 659 | /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ |
627 | mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) + | 660 | mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7; |
628 | ( count_one + 2 * count_two + count_peers ) * 7; | 661 | |
629 | 662 | fprintf(stderr, "Mem: %zd\n", mem); | |
630 | fprintf( stderr, "Mem: %zd\n", mem ); | 663 | |
631 | 664 | ptr = ptr_a = ptr_b = ptr_c = malloc(mem); | |
632 | ptr = ptr_a = ptr_b = ptr_c = malloc( mem ); | 665 | if (!ptr) |
633 | if( !ptr ) goto unlock_continue; | 666 | goto unlock_continue; |
634 | 667 | ||
635 | if( count_one > 4 || !count_def ) { | 668 | if (count_one > 4 || !count_def) { |
636 | mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 ); | 669 | mem_a = 1 + 1 + 2 + count_one * (19 + 7); |
637 | ptr_b += mem_a; ptr_c += mem_a; | 670 | ptr_b += mem_a; |
638 | ptr_a[0] = 1; /* Offset 0: packet type 1 */ | 671 | ptr_c += mem_a; |
639 | ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ | 672 | ptr_a[0] = 1; /* Offset 0: packet type 1 */ |
640 | ptr_a[2] = count_one >> 8; | 673 | ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ |
641 | ptr_a[3] = count_one & 255; | 674 | ptr_a[2] = count_one >> 8; |
642 | ptr_a += 4; | 675 | ptr_a[3] = count_one & 255; |
676 | ptr_a += 4; | ||
643 | } else | 677 | } else |
644 | count_def += count_one; | 678 | count_def += count_one; |
645 | 679 | ||
646 | if( count_two > 4 || !count_def ) { | 680 | if (count_two > 4 || !count_def) { |
647 | mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 ); | 681 | mem_b = 1 + 1 + 2 + count_two * (19 + 14); |
648 | ptr_c += mem_b; | 682 | ptr_c += mem_b; |
649 | ptr_b[0] = 2; /* Offset 0: packet type 2 */ | 683 | ptr_b[0] = 2; /* Offset 0: packet type 2 */ |
650 | ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ | 684 | ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ |
651 | ptr_b[2] = count_two >> 8; | 685 | ptr_b[2] = count_two >> 8; |
652 | ptr_b[3] = count_two & 255; | 686 | ptr_b[3] = count_two & 255; |
653 | ptr_b += 4; | 687 | ptr_b += 4; |
654 | } else | 688 | } else |
655 | count_def += count_two; | 689 | count_def += count_two; |
656 | 690 | ||
657 | if( count_def ) { | 691 | if (count_def) { |
658 | ptr_c[0] = 0; /* Offset 0: packet type 0 */ | 692 | ptr_c[0] = 0; /* Offset 0: packet type 0 */ |
659 | ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ | 693 | ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ |
660 | ptr_c[2] = count_def >> 8; | 694 | ptr_c[2] = count_def >> 8; |
661 | ptr_c[3] = count_def & 255; | 695 | ptr_c[3] = count_def & 255; |
662 | ptr_c += 4; | 696 | ptr_c += 4; |
663 | } | 697 | } |
664 | 698 | ||
665 | /* For each torrent in this bucket.. */ | 699 | /* For each torrent in this bucket.. */ |
666 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | 700 | for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) { |
667 | /* Address torrents members */ | 701 | /* Address torrents members */ |
668 | ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset; | 702 | ot_torrent *torrent = ((ot_torrent *)(torrents_list->data)) + tor_offset; |
669 | ot_peerlist *peer_list = torrent->peer_list; | 703 | ot_peerlist *peer_list = torrent->peer_list; |
670 | ot_peer *peers = (ot_peer*)(peer_list->peers.data); | 704 | ot_peer *peers = (ot_peer *)(peer_list->peers.data); |
671 | uint8_t **dst; | 705 | uint8_t **dst; |
672 | 706 | ||
673 | /* Determine destination slot */ | 707 | /* Determine destination slot */ |
674 | count_peers = peer_list->peer_count; | 708 | count_peers = peer_list->peer_count; |
675 | switch( count_peers ) { | 709 | switch (count_peers) { |
676 | case 0: continue; | 710 | case 0: |
677 | case 1: dst = mem_a ? &ptr_a : &ptr_c; break; | 711 | continue; |
678 | case 2: dst = mem_b ? &ptr_b : &ptr_c; break; | 712 | case 1: |
679 | default: dst = &ptr_c; break; | 713 | dst = mem_a ? &ptr_a : &ptr_c; |
714 | break; | ||
715 | case 2: | ||
716 | dst = mem_b ? &ptr_b : &ptr_c; | ||
717 | break; | ||
718 | default: | ||
719 | dst = &ptr_c; | ||
720 | break; | ||
680 | } | 721 | } |
681 | 722 | ||
682 | /* Copy tail of info_hash, advance pointer */ | 723 | /* Copy tail of info_hash, advance pointer */ |
683 | memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1); | 724 | memcpy(*dst, ((uint8_t *)torrent->hash) + 1, sizeof(ot_hash) - 1); |
684 | *dst += sizeof( ot_hash ) - 1; | 725 | *dst += sizeof(ot_hash) - 1; |
685 | 726 | ||
686 | /* Encode peer count */ | 727 | /* Encode peer count */ |
687 | if( dst == &ptr_c ) | 728 | if (dst == &ptr_c) |
688 | while( count_peers ) { | 729 | while (count_peers) { |
689 | if( count_peers <= 0x7f ) | 730 | if (count_peers <= 0x7f) |
690 | *(*dst)++ = count_peers; | 731 | *(*dst)++ = count_peers; |
691 | else | 732 | else |
692 | *(*dst)++ = 0x80 | ( count_peers & 0x7f ); | 733 | *(*dst)++ = 0x80 | (count_peers & 0x7f); |
693 | count_peers >>= 7; | 734 | count_peers >>= 7; |
694 | } | 735 | } |
695 | 736 | ||
696 | /* Copy peers */ | 737 | /* Copy peers */ |
697 | count_peers = peer_list->peer_count; | 738 | count_peers = peer_list->peer_count; |
698 | while( count_peers-- ) { | 739 | while (count_peers--) { |
699 | memcpy( *dst, peers++, OT_IP_SIZE + 3 ); | 740 | memcpy(*dst, peers++, OT_IP_SIZE + 3); |
700 | *dst += OT_IP_SIZE + 3; | 741 | *dst += OT_IP_SIZE + 3; |
701 | } | 742 | } |
702 | free_peerlist(peer_list); | 743 | free_peerlist(peer_list); |
703 | } | 744 | } |
704 | 745 | ||
705 | free( torrents_list->data ); | 746 | free(torrents_list->data); |
706 | memset( torrents_list, 0, sizeof(*torrents_list ) ); | 747 | memset(torrents_list, 0, sizeof(*torrents_list)); |
707 | unlock_continue: | 748 | unlock_continue: |
708 | mutex_bucket_unlock( bucket, 0 ); | 749 | mutex_bucket_unlock(bucket, 0); |
709 | 750 | ||
710 | if( ptr ) { | 751 | if (ptr) { |
711 | int i; | 752 | int i; |
712 | 753 | ||
713 | if( ptr_b > ptr_c ) ptr_c = ptr_b; | 754 | if (ptr_b > ptr_c) |
714 | if( ptr_a > ptr_c ) ptr_c = ptr_a; | 755 | ptr_c = ptr_b; |
756 | if (ptr_a > ptr_c) | ||
757 | ptr_c = ptr_a; | ||
715 | mem = ptr_c - ptr; | 758 | mem = ptr_c - ptr; |
716 | 759 | ||
717 | for( i=0; i < MAX_PEERS; ++i ) { | 760 | for (i = 0; i < MAX_PEERS; ++i) { |
718 | if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) { | 761 | if (PROXYPEER_ISCONNECTED(g_connections[i].state)) { |
719 | void *tmp = malloc( mem ); | 762 | void *tmp = malloc(mem); |
720 | if( tmp ) { | 763 | if (tmp) { |
721 | memcpy( tmp, ptr, mem ); | 764 | memcpy(tmp, ptr, mem); |
722 | iob_addbuf_free( &g_connections[i].outdata, tmp, mem ); | 765 | iob_addbuf_free(&g_connections[i].outdata, tmp, mem); |
723 | io_wantwrite( g_connections[i].fd ); | 766 | io_wantwrite(g_connections[i].fd); |
724 | } | 767 | } |
725 | } | 768 | } |
726 | } | 769 | } |
727 | 770 | ||
728 | free( ptr ); | 771 | free(ptr); |
729 | } | 772 | } |
730 | usleep( OT_SYNC_SLEEP ); | 773 | usleep(OT_SYNC_SLEEP); |
731 | } | 774 | } |
732 | } | 775 | } |
733 | return 0; | 776 | return 0; |
734 | } | 777 | } |
735 | 778 | ||
736 | static void livesync_issue_peersync( ) { | 779 | static void livesync_issue_peersync() { |
737 | socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, | 780 | socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT); |
738 | groupip_1, LIVESYNC_PORT); | 781 | g_peerbuffer_pos = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t); |
739 | g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); | ||
740 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; | 782 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; |
741 | } | 783 | } |
742 | 784 | ||
743 | void livesync_ticker( ) { | 785 | void livesync_ticker() { |
744 | /* livesync_issue_peersync sets g_next_packet_time */ | 786 | /* livesync_issue_peersync sets g_next_packet_time */ |
745 | if( time(NULL) > g_next_packet_time && | 787 | if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id)) |
746 | g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) | ||
747 | livesync_issue_peersync(); | 788 | livesync_issue_peersync(); |
748 | } | 789 | } |
749 | 790 | ||
750 | static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) { | 791 | static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer) { |
751 | // unsigned int i; | 792 | // unsigned int i; |
752 | 793 | ||
753 | *g_peerbuffer_pos = prefix; | 794 | *g_peerbuffer_pos = prefix; |
754 | memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 ); | 795 | memcpy(g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1); |
755 | memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 ); | 796 | memcpy(g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1); |
756 | 797 | ||
757 | #if 0 | 798 | #if 0 |
758 | /* Dump info_hash */ | 799 | /* Dump info_hash */ |
@@ -767,77 +808,84 @@ static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *pee | |||
767 | #endif | 808 | #endif |
768 | g_peerbuffer_pos += sizeof(ot_peer); | 809 | g_peerbuffer_pos += sizeof(ot_peer); |
769 | 810 | ||
770 | if( g_peerbuffer_pos >= g_peerbuffer_highwater ) | 811 | if (g_peerbuffer_pos >= g_peerbuffer_highwater) |
771 | livesync_issue_peersync(); | 812 | livesync_issue_peersync(); |
772 | } | 813 | } |
773 | 814 | ||
774 | static void process_indata( proxy_peer * peer ) { | 815 | static void process_indata(proxy_peer *peer) { |
775 | size_t consumed, peers; | 816 | size_t consumed, peers; |
776 | uint8_t *data = peer->indata, *hash; | 817 | uint8_t *data = peer->indata, *hash; |
777 | uint8_t *dataend = data + peer->indata_length; | 818 | uint8_t *dataend = data + peer->indata_length; |
778 | 819 | ||
779 | while( 1 ) { | 820 | while (1) { |
780 | /* If we're not inside of a packet, make a new one */ | 821 | /* If we're not inside of a packet, make a new one */ |
781 | if( !peer->packet_tcount ) { | 822 | if (!peer->packet_tcount) { |
782 | /* Ensure the header is complete or postpone processing */ | 823 | /* Ensure the header is complete or postpone processing */ |
783 | if( data + 4 > dataend ) break; | 824 | if (data + 4 > dataend) |
784 | peer->packet_type = data[0]; | 825 | break; |
785 | peer->packet_tprefix = data[1]; | 826 | peer->packet_type = data[0]; |
786 | peer->packet_tcount = data[2] * 256 + data[3]; | 827 | peer->packet_tprefix = data[1]; |
787 | data += 4; | 828 | peer->packet_tcount = data[2] * 256 + data[3]; |
788 | printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount ); | 829 | data += 4; |
830 | printf("type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount); | ||
789 | } | 831 | } |
790 | 832 | ||
791 | /* Ensure size for a minimal torrent block */ | 833 | /* Ensure size for a minimal torrent block */ |
792 | if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break; | 834 | if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend) |
835 | break; | ||
793 | 836 | ||
794 | /* Advance pointer to peer count or peers */ | 837 | /* Advance pointer to peer count or peers */ |
795 | hash = data; | 838 | hash = data; |
796 | data += sizeof(ot_hash) - 1; | 839 | data += sizeof(ot_hash) - 1; |
797 | 840 | ||
798 | /* Type 0 has peer count encoded before each peers */ | 841 | /* Type 0 has peer count encoded before each peers */ |
799 | peers = peer->packet_type; | 842 | peers = peer->packet_type; |
800 | if( !peers ) { | 843 | if (!peers) { |
801 | int shift = 0; | 844 | int shift = 0; |
802 | do peers |= ( 0x7f & *data ) << ( 7 * shift ); | 845 | do |
803 | while ( *(data++) & 0x80 && shift++ < 6 ); | 846 | peers |= (0x7f & *data) << (7 * shift); |
847 | while (*(data++) & 0x80 && shift++ < 6); | ||
804 | } | 848 | } |
805 | #if 0 | 849 | #if 0 |
806 | printf( "peers: %zd\n", peers ); | 850 | printf( "peers: %zd\n", peers ); |
807 | #endif | 851 | #endif |
808 | /* Ensure enough data being read to hold all peers */ | 852 | /* Ensure enough data being read to hold all peers */ |
809 | if( data + (OT_IP_SIZE + 3) * peers > dataend ) { | 853 | if (data + (OT_IP_SIZE + 3) * peers > dataend) { |
810 | data = hash; | 854 | data = hash; |
811 | break; | 855 | break; |
812 | } | 856 | } |
813 | while( peers-- ) { | 857 | while (peers--) { |
814 | livesync_proxytell( peer->packet_tprefix, hash, data ); | 858 | livesync_proxytell(peer->packet_tprefix, hash, data); |
815 | data += OT_IP_SIZE + 3; | 859 | data += OT_IP_SIZE + 3; |
816 | } | 860 | } |
817 | --peer->packet_tcount; | 861 | --peer->packet_tcount; |
818 | } | 862 | } |
819 | 863 | ||
820 | consumed = data - peer->indata; | 864 | consumed = data - peer->indata; |
821 | memmove( peer->indata, data, peer->indata_length - consumed ); | 865 | memmove(peer->indata, data, peer->indata_length - consumed); |
822 | peer->indata_length -= consumed; | 866 | peer->indata_length -= consumed; |
823 | } | 867 | } |
824 | 868 | ||
825 | static void * livesync_worker( void * args ) { | 869 | static void *livesync_worker(void *args) { |
826 | (void)args; | 870 | (void)args; |
827 | while( 1 ) { | 871 | while (1) { |
828 | ot_ip6 in_ip; uint16_t in_port; | 872 | ot_ip6 in_ip; |
829 | size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); | 873 | uint16_t in_port; |
874 | size_t datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port); | ||
830 | 875 | ||
831 | /* Expect at least tracker id and packet type */ | 876 | /* Expect at least tracker id and packet type */ |
832 | if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) | 877 | if (datalen <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t))) |
833 | continue; | 878 | continue; |
834 | if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { | 879 | if (!memcmp(g_inbuffer, &g_tracker_id, sizeof(g_tracker_id))) { |
835 | /* drop packet coming from ourselves */ | 880 | /* drop packet coming from ourselves */ |
836 | continue; | 881 | continue; |
837 | } | 882 | } |
838 | switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { | 883 | switch (uint32_read_big((char *)g_inbuffer + sizeof(g_tracker_id))) { |
839 | case OT_SYNC_PEER: | 884 | case OT_SYNC_PEER4: |
840 | livesync_handle_peersync( datalen ); | 885 | livesync_handle_peersync(datalen, OT_PEER_SIZE4); |
886 | break; | ||
887 | case OT_SYNC_PEER6: | ||
888 | livesync_handle_peersync(datalen, OT_PEER_SIZE6); | ||
841 | break; | 889 | break; |
842 | default: | 890 | default: |
843 | // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); | 891 | // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); |