summaryrefslogtreecommitdiff
path: root/proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy.c')
-rw-r--r--proxy.c852
1 files changed, 450 insertions, 402 deletions
diff --git a/proxy.c b/proxy.c
index 619b08a..9946240 100644
--- a/proxy.c
+++ b/proxy.c
@@ -4,33 +4,33 @@
4 $Id$ */ 4 $Id$ */
5 5
6/* System */ 6/* System */
7#include <arpa/inet.h>
8#include <ctype.h>
9#include <errno.h>
10#include <pthread.h>
11#include <pwd.h>
12#include <signal.h>
7#include <stdint.h> 13#include <stdint.h>
14#include <stdio.h>
8#include <stdlib.h> 15#include <stdlib.h>
9#include <string.h> 16#include <string.h>
10#include <arpa/inet.h>
11#include <sys/socket.h> 17#include <sys/socket.h>
12#include <unistd.h> 18#include <unistd.h>
13#include <errno.h>
14#include <signal.h>
15#include <stdio.h>
16#include <pwd.h>
17#include <ctype.h>
18#include <pthread.h>
19 19
20/* Libowfat */ 20/* Libowfat */
21#include "socket.h" 21#include "byte.h"
22#include "io.h" 22#include "io.h"
23#include "iob.h" 23#include "iob.h"
24#include "byte.h"
25#include "scan.h"
26#include "ip6.h" 24#include "ip6.h"
27#include "ndelay.h" 25#include "ndelay.h"
26#include "scan.h"
27#include "socket.h"
28 28
29/* Opentracker */ 29/* Opentracker */
30#include "trackerlogic.h"
31#include "ot_vector.h"
32#include "ot_mutex.h" 30#include "ot_mutex.h"
33#include "ot_stats.h" 31#include "ot_stats.h"
32#include "ot_vector.h"
33#include "trackerlogic.h"
34 34
35#ifndef WANT_SYNC_LIVE 35#ifndef WANT_SYNC_LIVE
36#define WANT_SYNC_LIVE 36#define WANT_SYNC_LIVE
@@ -40,28 +40,28 @@
40ot_ip6 g_serverip; 40ot_ip6 g_serverip;
41uint16_t g_serverport = 9009; 41uint16_t g_serverport = 9009;
42uint32_t g_tracker_id; 42uint32_t g_tracker_id;
43char groupip_1[4] = { 224,0,23,5 }; 43char groupip_1[4] = {224, 0, 23, 5};
44int g_self_pipe[2]; 44int g_self_pipe[2];
45 45
46/* If you have more than 10 peers, don't use this proxy 46/* If you have more than 10 peers, don't use this proxy
47 Use 20 slots for 10 peers to have room for 10 incoming connection slots 47 Use 20 slots for 10 peers to have room for 10 incoming connection slots
48 */ 48 */
49#define MAX_PEERS 20 49#define MAX_PEERS 20
50 50
51#define LIVESYNC_INCOMING_BUFFSIZE (256*256) 51#define LIVESYNC_INCOMING_BUFFSIZE (256 * 256)
52#define STREAMSYNC_OUTGOING_BUFFSIZE (256*256) 52#define STREAMSYNC_OUTGOING_BUFFSIZE (256 * 256)
53 53
54#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 54#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
55#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) 55#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash))
56#define LIVESYNC_MAXDELAY 15 /* seconds */ 56#define LIVESYNC_MAXDELAY 15 /* seconds */
57 57
58/* The amount of time a complete sync cycle should take */ 58/* The amount of time a complete sync cycle should take */
59#define OT_SYNC_INTERVAL_MINUTES 2 59#define OT_SYNC_INTERVAL_MINUTES 2
60 60
61/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ 61/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
62#define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) ) 62#define OT_SYNC_SLEEP (((OT_SYNC_INTERVAL_MINUTES) * 60 * 1000000) / (OT_BUCKET_COUNT))
63 63
64enum { OT_SYNC_PEER }; 64enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
65enum { FLAG_SERVERSOCKET = 1 }; 65enum { FLAG_SERVERSOCKET = 1 };
66 66
67/* For incoming packets */ 67/* For incoming packets */
@@ -75,145 +75,153 @@ static uint8_t *g_peerbuffer_pos;
75static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; 75static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
76static ot_time g_next_packet_time; 76static ot_time g_next_packet_time;
77 77
78static void * livesync_worker( void * args ); 78static void *livesync_worker(void *args);
79static void * streamsync_worker( void * args ); 79static void *streamsync_worker(void *args);
80static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ); 80static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer);
81 81
82void exerr( char * message ) { 82void exerr(char *message) {
83 fprintf( stderr, "%s\n", message ); 83 fprintf(stderr, "%s\n", message);
84 exit( 111 ); 84 exit(111);
85} 85}
86 86
87void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) { 87void stats_issue_event(ot_status_event event, PROTO_FLAG proto, uintptr_t event_data) {
88 (void) event; 88 (void)event;
89 (void) proto; 89 (void)proto;
90 (void) event_data; 90 (void)event_data;
91} 91}
92 92
93void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { 93void livesync_bind_mcast(ot_ip6 ip, uint16_t port) {
94 char tmpip[4] = {0,0,0,0}; 94 char tmpip[4] = {0, 0, 0, 0};
95 char *v4ip; 95 char *v4ip;
96 96
97 if( !ip6_isv4mapped(ip)) 97 if (!ip6_isv4mapped(ip))
98 exerr("v6 mcast support not yet available."); 98 exerr("v6 mcast support not yet available.");
99 v4ip = ip+12; 99 v4ip = ip + 12;
100 100
101 if( g_socket_in != -1 ) 101 if (g_socket_in != -1)
102 exerr("Error: Livesync listen ip specified twice."); 102 exerr("Error: Livesync listen ip specified twice.");
103 103
104 if( ( g_socket_in = socket_udp4( )) < 0) 104 if ((g_socket_in = socket_udp4()) < 0)
105 exerr("Error: Cant create live sync incoming socket." ); 105 exerr("Error: Cant create live sync incoming socket.");
106 ndelay_off(g_socket_in); 106 ndelay_off(g_socket_in);
107 107
108 if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) 108 if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1)
109 exerr("Error: Cant bind live sync incoming socket." ); 109 exerr("Error: Cant bind live sync incoming socket.");
110 110
111 if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) ) 111 if (socket_mcjoin4(g_socket_in, groupip_1, v4ip))
112 exerr("Error: Cant make live sync incoming socket join mcast group."); 112 exerr("Error: Cant make live sync incoming socket join mcast group.");
113 113
114 if( ( g_socket_out = socket_udp4()) < 0) 114 if ((g_socket_out = socket_udp4()) < 0)
115 exerr("Error: Cant create live sync outgoing socket." ); 115 exerr("Error: Cant create live sync outgoing socket.");
116 if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 ) 116 if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1)
117 exerr("Error: Cant bind live sync outgoing socket." ); 117 exerr("Error: Cant bind live sync outgoing socket.");
118 118
119 socket_mcttl4(g_socket_out, 1); 119 socket_mcttl4(g_socket_out, 1);
120 socket_mcloop4(g_socket_out, 1); 120 socket_mcloop4(g_socket_out, 1);
121} 121}
122 122
123size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { 123size_t add_peer_to_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) {
124 int exactmatch; 124 int exactmatch;
125 ot_torrent *torrent; 125 ot_torrent *torrent;
126 ot_peer *peer_dest; 126 ot_peerlist *peer_list;
127 ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); 127 ot_peer *peer_dest;
128 128 ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash);
129 torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); 129 size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size);
130 if( !torrent ) 130
131 torrent = vector_find_or_insert(torrents_list, (void *)hash, sizeof(ot_torrent), compare_size, &exactmatch);
132 if (!torrent)
131 return -1; 133 return -1;
132 134
133 if( !exactmatch ) { 135 if (!exactmatch) {
134 /* Create a new torrent entry, then */ 136 /* Create a new torrent entry, then */
135 memcpy( torrent->hash, hash, sizeof(ot_hash) ); 137 memcpy(torrent->hash, hash, sizeof(ot_hash));
136 138
137 if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { 139 if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) {
138 vector_remove_torrent( torrents_list, torrent ); 140 vector_remove_torrent(torrents_list, torrent);
139 mutex_bucket_unlock_by_hash( hash, 0 ); 141 mutex_bucket_unlock_by_hash(hash, 0);
140 return -1; 142 return -1;
141 } 143 }
142 144
143 byte_zero( torrent->peer_list, sizeof( ot_peerlist ) ); 145 byte_zero(torrent->peer_list6, sizeof(ot_peerlist));
146 byte_zero(torrent->peer_list4, sizeof(ot_peerlist));
144 } 147 }
145 148
149 peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
150
146 /* Check for peer in torrent */ 151 /* Check for peer in torrent */
147 peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); 152 peer_dest = vector_find_or_insert_peer(&(peer_list->peers), peer, peer_size, &exactmatch);
148 if( !peer_dest ) { 153 if (!peer_dest) {
149 mutex_bucket_unlock_by_hash( hash, 0 ); 154 mutex_bucket_unlock_by_hash(hash, 0);
150 return -1; 155 return -1;
151 } 156 }
152 /* Tell peer that it's fresh */ 157 /* Tell peer that it's fresh */
153 OT_PEERTIME( peer ) = 0; 158 OT_PEERTIME(peer, peer_size) = 0;
154 159
155 /* If we hadn't had a match create peer there */ 160 /* If we hadn't had a match create peer there */
156 if( !exactmatch ) { 161 if (!exactmatch) {
157 torrent->peer_list->peer_count++; 162 peer_list->peer_count++;
158 if( OT_PEERFLAG(peer) & PEER_FLAG_SEEDING ) 163 if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING)
159 torrent->peer_list->seed_count++; 164 peer_list->seed_count++;
160 } 165 }
161 memcpy( peer_dest, peer, sizeof(ot_peer) ); 166 memcpy(peer_dest, peer, peer_size);
162 mutex_bucket_unlock_by_hash( hash, 0 ); 167 mutex_bucket_unlock_by_hash(hash, 0);
163 return 0; 168 return 0;
164} 169}
165 170
166size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer ) { 171size_t remove_peer_from_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) {
167 int exactmatch; 172 int exactmatch;
168 ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); 173 ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash);
169 ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); 174 ot_torrent *torrent = binary_search(hash, torrents_list->data, torrents_list->size, sizeof(ot_torrent), OT_HASH_COMPARE_SIZE, &exactmatch);
170 175
171 if( exactmatch ) { 176 if (exactmatch) {
172 ot_peerlist *peer_list = torrent->peer_list; 177 ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
173 switch( vector_remove_peer( &peer_list->peers, peer ) ) { 178 switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) {
174 case 2: peer_list->seed_count--; /* Intentional fallthrough */ 179 case 2:
175 case 1: peer_list->peer_count--; /* Intentional fallthrough */ 180 peer_list->seed_count--; /* Intentional fallthrough */
176 default: break; 181 case 1:
182 peer_list->peer_count--; /* Intentional fallthrough */
183 default:
184 break;
177 } 185 }
178 } 186 }
179 187
180 mutex_bucket_unlock_by_hash( hash, 0 ); 188 mutex_bucket_unlock_by_hash(hash, 0);
181 return 0; 189 return 0;
182} 190}
183 191
184void free_peerlist( ot_peerlist *peer_list ) { 192void free_peerlist(ot_peerlist *peer_list) {
185 if( peer_list->peers.data ) { 193 if (peer_list->peers.data) {
186 if( OT_PEERLIST_HASBUCKETS( peer_list ) ) { 194 if (OT_PEERLIST_HASBUCKETS(peer_list)) {
187 ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data); 195 ot_vector *bucket_list = (ot_vector *)(peer_list->peers.data);
188 196
189 while( peer_list->peers.size-- ) 197 while (peer_list->peers.size--)
190 free( bucket_list++->data ); 198 free(bucket_list++->data);
191 } 199 }
192 free( peer_list->peers.data ); 200 free(peer_list->peers.data);
193 } 201 }
194 free( peer_list ); 202 free(peer_list);
195} 203}
196 204
197static void livesync_handle_peersync( ssize_t datalen ) { 205static void livesync_handle_peersync(ssize_t datalen, size_t peer_size) {
198 int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); 206 int off = sizeof(g_tracker_id) + sizeof(uint32_t);
199 207
200 fprintf( stderr, "." ); 208 fprintf(stderr, ".");
201 209
202 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { 210 while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= datalen) {
203 ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); 211 ot_peer *peer = (ot_peer *)(g_inbuffer + off + sizeof(ot_hash));
204 ot_hash *hash = (ot_hash*)(g_inbuffer + off); 212 ot_hash *hash = (ot_hash *)(g_inbuffer + off);
205 213
206 if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) 214 if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED)
207 remove_peer_from_torrent_proxy( *hash, peer ); 215 remove_peer_from_torrent_proxy(*hash, peer, peer_size);
208 else 216 else
209 add_peer_to_torrent_proxy( *hash, peer ); 217 add_peer_to_torrent_proxy(*hash, peer, peer_size);
210 218
211 off += sizeof( ot_hash ) + sizeof( ot_peer ); 219 off += sizeof(ot_hash) + peer_size;
212 } 220 }
213} 221}
214 222
215int usage( char *self ) { 223int usage(char *self) {
216 fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self ); 224 fprintf(stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self);
217 return 0; 225 return 0;
218} 226}
219 227
@@ -228,115 +236,115 @@ enum {
228 FLAG_MASK = 0x07 236 FLAG_MASK = 0x07
229}; 237};
230 238
231#define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING) 239#define PROXYPEER_NEEDSCONNECT(flag) ((flag) == FLAG_OUTGOING)
232#define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED) 240#define PROXYPEER_ISCONNECTED(flag) (((flag) & FLAG_MASK) == FLAG_CONNECTED)
233#define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED) 241#define PROXYPEER_SETDISCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_DISCONNECTED)
234#define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING) 242#define PROXYPEER_SETCONNECTING(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTING)
235#define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID) 243#define PROXYPEER_SETWAITTRACKERID(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_WAITTRACKERID)
236#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) 244#define PROXYPEER_SETCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTED)
237 245
238typedef struct { 246typedef struct {
239 int state; /* Whether we want to connect, how far our handshake is, etc. */ 247 int state; /* Whether we want to connect, how far our handshake is, etc. */
240 ot_ip6 ip; /* The peer to connect to */ 248 ot_ip6 ip; /* The peer to connect to */
241 uint16_t port; /* The peers port */ 249 uint16_t port; /* The peers port */
242 uint8_t indata[8192*16]; /* Any data not processed yet */ 250 uint8_t indata[8192 * 16]; /* Any data not processed yet */
243 size_t indata_length; /* Length of unprocessed data */ 251 size_t indata_length; /* Length of unprocessed data */
244 uint32_t tracker_id; /* How the other end greeted */ 252 uint32_t tracker_id; /* How the other end greeted */
245 int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ 253 int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
246 io_batch outdata; /* The iobatch containing our sync data */ 254 io_batch outdata; /* The iobatch containing our sync data */
247 255
248 size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ 256 size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */
249 uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ 257 uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */
250 uint8_t packet_type; /* Type of current packet */ 258 uint8_t packet_type; /* Type of current packet */
251 uint32_t packet_tid; /* Tracker id for current packet */ 259 uint32_t packet_tid; /* Tracker id for current packet */
252 260
253} proxy_peer; 261} proxy_peer;
254static void process_indata( proxy_peer * peer ); 262static void process_indata(proxy_peer *peer);
255 263
256void reset_info_block( proxy_peer * peer ) { 264void reset_info_block(proxy_peer *peer) {
257 peer->indata_length = 0; 265 peer->indata_length = 0;
258 peer->tracker_id = 0; 266 peer->tracker_id = 0;
259 peer->fd = -1; 267 peer->fd = -1;
260 peer->packet_tcount = 0; 268 peer->packet_tcount = 0;
261 iob_reset( &peer->outdata ); 269 iob_reset(&peer->outdata);
262 PROXYPEER_SETDISCONNECTED( peer->state ); 270 PROXYPEER_SETDISCONNECTED(peer->state);
263} 271}
264 272
265/* Number of connections to peers 273/* Number of connections to peers
266 * If a peer's IP is set, we try to reconnect, when the connection drops 274 * If a peer's IP is set, we try to reconnect, when the connection drops
267 * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it 275 * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
268 * Multiple connections to/from the same ip are okay, if tracker_id doesn't match 276 * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
269 * Reconnect attempts occur only twice a minute 277 * Reconnect attempts occur only twice a minute
270*/ 278 */
271static int g_connection_count; 279static int g_connection_count;
272static ot_time g_connection_reconn; 280static ot_time g_connection_reconn;
273static proxy_peer g_connections[MAX_PEERS]; 281static proxy_peer g_connections[MAX_PEERS];
274 282
275static void handle_reconnects( void ) { 283static void handle_reconnects(void) {
276 int i; 284 int i;
277 for( i=0; i<g_connection_count; ++i ) 285 for (i = 0; i < g_connection_count; ++i)
278 if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { 286 if (PROXYPEER_NEEDSCONNECT(g_connections[i].state)) {
279 int64 newfd = socket_tcp6( ); 287 int64 newfd = socket_tcp6();
280 fprintf( stderr, "(Re)connecting to peer..." ); 288 fprintf(stderr, "(Re)connecting to peer...");
281 if( newfd < 0 ) continue; /* No socket for you */ 289 if (newfd < 0)
290 continue; /* No socket for you */
282 io_fd(newfd); 291 io_fd(newfd);
283 if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { 292 if (socket_bind6_reuse(newfd, g_serverip, g_serverport, 0)) {
284 io_close( newfd ); 293 io_close(newfd);
285 continue; 294 continue;
286 } 295 }
287 if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 && 296 if (socket_connect6(newfd, g_connections[i].ip, g_connections[i].port, 0) == -1 && errno != EINPROGRESS && errno != EWOULDBLOCK) {
288 errno != EINPROGRESS && errno != EWOULDBLOCK ) {
289 close(newfd); 297 close(newfd);
290 continue; 298 continue;
291 } 299 }
292 io_wantwrite(newfd); /* So we will be informed when it is connected */ 300 io_wantwrite(newfd); /* So we will be informed when it is connected */
293 io_setcookie(newfd,g_connections+i); 301 io_setcookie(newfd, g_connections + i);
294 302
295 /* Prepare connection info block */ 303 /* Prepare connection info block */
296 reset_info_block( g_connections+i ); 304 reset_info_block(g_connections + i);
297 g_connections[i].fd = newfd; 305 g_connections[i].fd = newfd;
298 PROXYPEER_SETCONNECTING( g_connections[i].state ); 306 PROXYPEER_SETCONNECTING(g_connections[i].state);
299 } 307 }
300 g_connection_reconn = time(NULL) + 30; 308 g_connection_reconn = time(NULL) + 30;
301} 309}
302 310
303/* Handle incoming connection requests, check against whitelist */ 311/* Handle incoming connection requests, check against whitelist */
304static void handle_accept( int64 serversocket ) { 312static void handle_accept(int64 serversocket) {
305 int64 newfd; 313 int64 newfd;
306 ot_ip6 ip; 314 ot_ip6 ip;
307 uint16 port; 315 uint16 port;
308 316
309 while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) { 317 while ((newfd = socket_accept6(serversocket, ip, &port, NULL)) != -1) {
310 318
311 /* XXX some access control */ 319 /* XXX some access control */
312 320
313 /* Put fd into a non-blocking mode */ 321 /* Put fd into a non-blocking mode */
314 io_nonblock( newfd ); 322 io_nonblock(newfd);
315 323
316 if( !io_fd( newfd ) ) 324 if (!io_fd(newfd))
317 io_close( newfd ); 325 io_close(newfd);
318 else { 326 else {
319 /* Find a new home for our incoming connection */ 327 /* Find a new home for our incoming connection */
320 int i; 328 int i;
321 for( i=0; i<MAX_PEERS; ++i ) 329 for (i = 0; i < MAX_PEERS; ++i)
322 if( g_connections[i].state == FLAG_DISCONNECTED ) 330 if (g_connections[i].state == FLAG_DISCONNECTED)
323 break; 331 break;
324 if( i == MAX_PEERS ) { 332 if (i == MAX_PEERS) {
325 fprintf( stderr, "No room for incoming connection." ); 333 fprintf(stderr, "No room for incoming connection.");
326 close( newfd ); 334 close(newfd);
327 continue; 335 continue;
328 } 336 }
329 337
330 /* Prepare connection info block */ 338 /* Prepare connection info block */
331 reset_info_block( g_connections+i ); 339 reset_info_block(g_connections + i);
332 PROXYPEER_SETCONNECTING( g_connections[i].state ); 340 PROXYPEER_SETCONNECTING(g_connections[i].state);
333 g_connections[i].port = port; 341 g_connections[i].port = port;
334 g_connections[i].fd = newfd; 342 g_connections[i].fd = newfd;
335 343
336 io_setcookie( newfd, g_connections + i ); 344 io_setcookie(newfd, g_connections + i);
337 345
338 /* We expect the connecting side to begin with its tracker_id */ 346 /* We expect the connecting side to begin with its tracker_id */
339 io_wantread( newfd ); 347 io_wantread(newfd);
340 } 348 }
341 } 349 }
342 350
@@ -344,117 +352,116 @@ static void handle_accept( int64 serversocket ) {
344} 352}
345 353
346/* New sync data on the stream */ 354/* New sync data on the stream */
347static void handle_read( int64 peersocket ) { 355static void handle_read(int64 peersocket) {
348 int i; 356 int i;
349 int64 datalen; 357 int64 datalen;
350 uint32_t tracker_id; 358 uint32_t tracker_id;
351 proxy_peer *peer = io_getcookie( peersocket ); 359 proxy_peer *peer = io_getcookie(peersocket);
352 360
353 if( !peer ) { 361 if (!peer) {
354 /* Can't happen ;) */ 362 /* Can't happen ;) */
355 io_close( peersocket ); 363 io_close(peersocket);
356 return; 364 return;
357 } 365 }
358 switch( peer->state & FLAG_MASK ) { 366 switch (peer->state & FLAG_MASK) {
359 case FLAG_DISCONNECTED: 367 case FLAG_DISCONNECTED:
360 io_close( peersocket ); 368 io_close(peersocket);
361 break; /* Shouldnt happen */ 369 break; /* Shouldnt happen */
362 case FLAG_CONNECTING: 370 case FLAG_CONNECTING:
363 case FLAG_WAITTRACKERID: 371 case FLAG_WAITTRACKERID:
364 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) 372 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
365 This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ 373 This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
366 if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) 374 if (io_tryread(peersocket, (void *)&tracker_id, sizeof(tracker_id)) != sizeof(tracker_id))
367 goto close_socket; 375 goto close_socket;
368 376
369 /* See, if we already have a connection to that peer */ 377 /* See, if we already have a connection to that peer */
370 for( i=0; i<MAX_PEERS; ++i ) 378 for (i = 0; i < MAX_PEERS; ++i)
371 if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED && 379 if ((g_connections[i].state & FLAG_MASK) == FLAG_CONNECTED && g_connections[i].tracker_id == tracker_id) {
372 g_connections[i].tracker_id == tracker_id ) { 380 fprintf(stderr, "Peer already connected. Closing connection.\n");
373 fprintf( stderr, "Peer already connected. Closing connection.\n" );
374 goto close_socket; 381 goto close_socket;
375 } 382 }
376 383
377 /* Also no need for soliloquy */ 384 /* Also no need for soliloquy */
378 if( tracker_id == g_tracker_id ) 385 if (tracker_id == g_tracker_id)
379 goto close_socket; 386 goto close_socket;
380 387
381 /* The new connection is good, send our tracker_id on incoming connections */ 388 /* The new connection is good, send our tracker_id on incoming connections */
382 if( peer->state == FLAG_CONNECTING ) 389 if (peer->state == FLAG_CONNECTING)
383 if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) ) 390 if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) != sizeof(g_tracker_id))
384 goto close_socket; 391 goto close_socket;
385 392
386 peer->tracker_id = tracker_id; 393 peer->tracker_id = tracker_id;
387 PROXYPEER_SETCONNECTED( peer->state ); 394 PROXYPEER_SETCONNECTED(peer->state);
388 395
389 if( peer->state & FLAG_OUTGOING ) 396 if (peer->state & FLAG_OUTGOING)
390 fprintf( stderr, "succeeded.\n" ); 397 fprintf(stderr, "succeeded.\n");
391 else 398 else
392 fprintf( stderr, "Incoming connection successful.\n" ); 399 fprintf(stderr, "Incoming connection successful.\n");
393 400
394 break; 401 break;
395close_socket: 402 close_socket:
396 fprintf( stderr, "Handshake incomplete, closing socket\n" ); 403 fprintf(stderr, "Handshake incomplete, closing socket\n");
397 io_close( peersocket ); 404 io_close(peersocket);
398 reset_info_block( peer ); 405 reset_info_block(peer);
399 break; 406 break;
400 case FLAG_CONNECTED: 407 case FLAG_CONNECTED:
401 /* Here we acutally expect data from peer 408 /* Here we acutally expect data from peer
402 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ 409 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
403 datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); 410 datalen = io_tryread(peersocket, (void *)(peer->indata + peer->indata_length), sizeof(peer->indata) - peer->indata_length);
404 if( !datalen || datalen < -1 ) { 411 if (!datalen || datalen < -1) {
405 fprintf( stderr, "Connection closed by remote peer.\n" ); 412 fprintf(stderr, "Connection closed by remote peer.\n");
406 io_close( peersocket ); 413 io_close(peersocket);
407 reset_info_block( peer ); 414 reset_info_block(peer);
408 } else if( datalen > 0 ) { 415 } else if (datalen > 0) {
409 peer->indata_length += datalen; 416 peer->indata_length += datalen;
410 process_indata( peer ); 417 process_indata(peer);
411 } 418 }
412 break; 419 break;
413 } 420 }
414} 421}
415 422
416/* Can write new sync data to the stream */ 423/* Can write new sync data to the stream */
417static void handle_write( int64 peersocket ) { 424static void handle_write(int64 peersocket) {
418 proxy_peer *peer = io_getcookie( peersocket ); 425 proxy_peer *peer = io_getcookie(peersocket);
419 426
420 if( !peer ) { 427 if (!peer) {
421 /* Can't happen ;) */ 428 /* Can't happen ;) */
422 io_close( peersocket ); 429 io_close(peersocket);
423 return; 430 return;
424 } 431 }
425 432
426 switch( peer->state & FLAG_MASK ) { 433 switch (peer->state & FLAG_MASK) {
427 case FLAG_DISCONNECTED: 434 case FLAG_DISCONNECTED:
428 default: /* Should not happen */ 435 default: /* Should not happen */
429 io_close( peersocket ); 436 io_close(peersocket);
430 break; 437 break;
431 case FLAG_CONNECTING: 438 case FLAG_CONNECTING:
432 /* Ensure that the connection is established and handle connection error */ 439 /* Ensure that the connection is established and handle connection error */
433 if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { 440 if (peer->state & FLAG_OUTGOING && !socket_connected(peersocket)) {
434 fprintf( stderr, "failed\n" ); 441 fprintf(stderr, "failed\n");
435 reset_info_block( peer ); 442 reset_info_block(peer);
436 io_close( peersocket ); 443 io_close(peersocket);
437 break; 444 break;
438 } 445 }
439 446
440 if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) { 447 if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) == sizeof(g_tracker_id)) {
441 PROXYPEER_SETWAITTRACKERID( peer->state ); 448 PROXYPEER_SETWAITTRACKERID(peer->state);
442 io_dontwantwrite( peersocket ); 449 io_dontwantwrite(peersocket);
443 io_wantread( peersocket ); 450 io_wantread(peersocket);
444 } else { 451 } else {
445 fprintf( stderr, "Handshake incomplete, closing socket\n" ); 452 fprintf(stderr, "Handshake incomplete, closing socket\n");
446 io_close( peersocket ); 453 io_close(peersocket);
447 reset_info_block( peer ); 454 reset_info_block(peer);
448 } 455 }
449 break; 456 break;
450 case FLAG_CONNECTED: 457 case FLAG_CONNECTED:
451 switch( iob_send( peersocket, &peer->outdata ) ) { 458 switch (iob_send(peersocket, &peer->outdata)) {
452 case 0: /* all data sent */ 459 case 0: /* all data sent */
453 io_dontwantwrite( peersocket ); 460 io_dontwantwrite(peersocket);
454 break; 461 break;
455 case -3: /* an error occured */ 462 case -3: /* an error occured */
456 io_close( peersocket ); 463 io_close(peersocket);
457 reset_info_block( peer ); 464 reset_info_block(peer);
458 break; 465 break;
459 default: /* Normal operation or eagain */ 466 default: /* Normal operation or eagain */
460 break; 467 break;
@@ -469,290 +476,324 @@ static void server_mainloop() {
469 int64 sock; 476 int64 sock;
470 477
471 /* inlined livesync_init() */ 478 /* inlined livesync_init() */
472 memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); 479 memset(g_peerbuffer_start, 0, sizeof(g_peerbuffer_start));
473 g_peerbuffer_pos = g_peerbuffer_start; 480 g_peerbuffer_pos = g_peerbuffer_start;
474 memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); 481 memcpy(g_peerbuffer_pos, &g_tracker_id, sizeof(g_tracker_id));
475 uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); 482 uint32_pack_big((char *)g_peerbuffer_pos + sizeof(g_tracker_id), OT_SYNC_PEER);
476 g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); 483 g_peerbuffer_pos += sizeof(g_tracker_id) + sizeof(uint32_t);
477 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; 484 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
478 485
479 while(1) { 486 while (1) {
480 /* See, if we need to connect to anyone */ 487 /* See if we need to connect to anyone */
481 if( time(NULL) > g_connection_reconn ) 488 if (time(NULL) > g_connection_reconn)
482 handle_reconnects( ); 489 handle_reconnects();
483 490
484 /* Wait for io events until next approx reconn check time */ 491 /* Wait for io events until next approx reconn check time */
485 io_waituntil2( 30*1000 ); 492 io_waituntil2(30 * 1000);
486 493
487 /* Loop over readable sockets */ 494 /* Loop over readable sockets */
488 while( ( sock = io_canread( ) ) != -1 ) { 495 while ((sock = io_canread()) != -1) {
489 const void *cookie = io_getcookie( sock ); 496 const void *cookie = io_getcookie(sock);
490 if( (uintptr_t)cookie == FLAG_SERVERSOCKET ) 497 if ((uintptr_t)cookie == FLAG_SERVERSOCKET)
491 handle_accept( sock ); 498 handle_accept(sock);
492 else 499 else
493 handle_read( sock ); 500 handle_read(sock);
494 } 501 }
495 502
496 /* Loop over writable sockets */ 503 /* Loop over writable sockets */
497 while( ( sock = io_canwrite( ) ) != -1 ) 504 while ((sock = io_canwrite()) != -1)
498 handle_write( sock ); 505 handle_write(sock);
499 506
500 livesync_ticker( ); 507 livesync_ticker();
501 } 508 }
502} 509}
503 510
504static void panic( const char *routine ) { 511static void panic(const char *routine) {
505 fprintf( stderr, "%s: %s\n", routine, strerror(errno) ); 512 fprintf(stderr, "%s: %s\n", routine, strerror(errno));
506 exit( 111 ); 513 exit(111);
507} 514}
508 515
509static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) { 516static int64_t ot_try_bind(ot_ip6 ip, uint16_t port) {
510 int64 sock = socket_tcp6( ); 517 int64 sock = socket_tcp6();
511 518
512 if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 ) 519 if (socket_bind6_reuse(sock, ip, port, 0) == -1)
513 panic( "socket_bind6_reuse" ); 520 panic("socket_bind6_reuse");
514 521
515 if( socket_listen( sock, SOMAXCONN) == -1 ) 522 if (socket_listen(sock, SOMAXCONN) == -1)
516 panic( "socket_listen" ); 523 panic("socket_listen");
517 524
518 if( !io_fd( sock ) ) 525 if (!io_fd(sock))
519 panic( "io_fd" ); 526 panic("io_fd");
520 527
521 io_setcookie( sock, (void*)FLAG_SERVERSOCKET ); 528 io_setcookie(sock, (void *)FLAG_SERVERSOCKET);
522 io_wantread( sock ); 529 io_wantread(sock);
523 return sock; 530 return sock;
524} 531}
525 532
526 533static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) {
527static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) {
528 const char *s = src; 534 const char *s = src;
529 int off, bracket = 0; 535 int off, bracket = 0;
530 while( isspace(*s) ) ++s; 536 while (isspace(*s))
531 if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */ 537 ++s;
532 if( !(off = scan_ip6( s, ip ) ) ) 538 if (*s == '[')
539 ++s, ++bracket; /* for v6 style notation */
540 if (!(off = scan_ip6(s, ip)))
533 return 0; 541 return 0;
534 s += off; 542 s += off;
535 if( *s == 0 || isspace(*s)) return s-src; 543 if (*s == 0 || isspace(*s))
536 if( *s == ']' && bracket ) ++s; 544 return s - src;
537 if( !ip6_isv4mapped(ip)){ 545 if (*s == ']' && bracket)
538 if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0; 546 ++s;
547 if (!ip6_isv4mapped(ip)) {
548 if ((bracket && *(s) != ':') || (*(s) != '.'))
549 return 0;
539 s++; 550 s++;
540 } else { 551 } else {
541 if( *(s++) != ':' ) return 0; 552 if (*(s++) != ':')
553 return 0;
542 } 554 }
543 if( !(off = scan_ushort (s, port ) ) ) 555 if (!(off = scan_ushort(s, port)))
544 return 0; 556 return 0;
545 return off+s-src; 557 return off + s - src;
546} 558}
547 559
548int main( int argc, char **argv ) { 560int main(int argc, char **argv) {
549 static pthread_t sync_in_thread_id; 561 static pthread_t sync_in_thread_id;
550 static pthread_t sync_out_thread_id; 562 static pthread_t sync_out_thread_id;
551 ot_ip6 serverip; 563 ot_ip6 serverip;
552 uint16_t tmpport; 564 uint16_t tmpport;
553 int scanon = 1, lbound = 0, sbound = 0; 565 int scanon = 1, lbound = 0, sbound = 0;
554 566
555 srandom( time(NULL) ); 567 srandom(time(NULL));
556#ifdef WANT_ARC4RANDOM 568#ifdef WANT_ARC4RANDOM
557 g_tracker_id = arc4random(); 569 g_tracker_id = arc4random();
558#else 570#else
559 g_tracker_id = random(); 571 g_tracker_id = random();
560#endif 572#endif
561 noipv6=1;
562 573
563 while( scanon ) { 574 while (scanon) {
564 switch( getopt( argc, argv, ":l:c:L:h" ) ) { 575 switch (getopt(argc, argv, ":l:c:L:h")) {
565 case -1: scanon = 0; break; 576 case -1:
577 scanon = 0;
578 break;
566 case 'l': 579 case 'l':
567 tmpport = 0; 580 tmpport = 0;
568 if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } 581 if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
569 ot_try_bind( serverip, tmpport ); 582 usage(argv[0]);
583 exit(1);
584 }
585 ot_try_bind(serverip, tmpport);
570 ++sbound; 586 ++sbound;
571 break; 587 break;
572 case 'c': 588 case 'c':
573 if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" ); 589 if (g_connection_count > MAX_PEERS / 2)
590 exerr("Connection limit exceeded.\n");
574 tmpport = 0; 591 tmpport = 0;
575 if( !scan_ip6_port( optarg, 592 if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) {
576 g_connections[g_connection_count].ip, 593 usage(argv[0]);
577 &g_connections[g_connection_count].port ) || 594 exit(1);
578 !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); } 595 }
579 g_connections[g_connection_count++].state = FLAG_OUTGOING; 596 g_connections[g_connection_count++].state = FLAG_OUTGOING;
580 break; 597 break;
581 case 'L': 598 case 'L':
582 tmpport = 9696; 599 tmpport = 9696;
583 if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } 600 if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
584 livesync_bind_mcast( serverip, tmpport); ++lbound; break; 601 usage(argv[0]);
602 exit(1);
603 }
604 livesync_bind_mcast(serverip, tmpport);
605 ++lbound;
606 break;
585 default: 607 default:
586 case '?': usage( argv[0] ); exit( 1 ); 608 case '?':
609 usage(argv[0]);
610 exit(1);
587 } 611 }
588 } 612 }
589 613
590 if( !lbound ) exerr( "No livesync port bound." ); 614 if (!lbound)
591 if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." ); 615 exerr("No livesync port bound.");
592 pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); 616 if (!g_connection_count && !sbound)
593 pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); 617 exerr("No streamsync port bound.");
618 pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL);
619 pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL);
594 620
595 server_mainloop(); 621 server_mainloop();
596 return 0; 622 return 0;
597} 623}
598 624
599static void * streamsync_worker( void * args ) { 625static void *streamsync_worker(void *args) {
600 (void)args; 626 (void)args;
601 while( 1 ) { 627 while (1) {
602 int bucket; 628 int bucket;
603 /* For each bucket... */ 629 /* For each bucket... */
604 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { 630 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
605 /* Get exclusive access to that bucket */ 631 /* Get exclusive access to that bucket */
606 ot_vector *torrents_list = mutex_bucket_lock( bucket ); 632 ot_vector *torrents_list = mutex_bucket_lock(bucket);
607 size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; 633 size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
608 size_t mem, mem_a = 0, mem_b = 0; 634 size_t mem, mem_a = 0, mem_b = 0;
609 uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c; 635 uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c;
610 636
611 if( !torrents_list->size ) goto unlock_continue; 637 if (!torrents_list->size)
638 goto unlock_continue;
612 639
613 /* For each torrent in this bucket.. */ 640 /* For each torrent in this bucket.. */
614 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { 641 for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
615 /* Address torrents members */ 642 /* Address torrents members */
616 ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; 643 ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list;
617 switch( peer_list->peer_count ) { 644 switch (peer_list->peer_count) {
618 case 2: count_two++; break; 645 case 2:
619 case 1: count_one++; break; 646 count_two++;
620 case 0: break; 647 break;
621 default: count_def++; 648 case 1:
622 count_peers += peer_list->peer_count; 649 count_one++;
650 break;
651 case 0:
652 break;
653 default:
654 count_def++;
655 count_peers += peer_list->peer_count;
623 } 656 }
624 } 657 }
625 658
626 /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ 659 /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
627 mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) + 660 mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7;
628 ( count_one + 2 * count_two + count_peers ) * 7; 661
629 662 fprintf(stderr, "Mem: %zd\n", mem);
630 fprintf( stderr, "Mem: %zd\n", mem ); 663
631 664 ptr = ptr_a = ptr_b = ptr_c = malloc(mem);
632 ptr = ptr_a = ptr_b = ptr_c = malloc( mem ); 665 if (!ptr)
633 if( !ptr ) goto unlock_continue; 666 goto unlock_continue;
634 667
635 if( count_one > 4 || !count_def ) { 668 if (count_one > 4 || !count_def) {
636 mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 ); 669 mem_a = 1 + 1 + 2 + count_one * (19 + 7);
637 ptr_b += mem_a; ptr_c += mem_a; 670 ptr_b += mem_a;
638 ptr_a[0] = 1; /* Offset 0: packet type 1 */ 671 ptr_c += mem_a;
639 ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ 672 ptr_a[0] = 1; /* Offset 0: packet type 1 */
640 ptr_a[2] = count_one >> 8; 673 ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
641 ptr_a[3] = count_one & 255; 674 ptr_a[2] = count_one >> 8;
642 ptr_a += 4; 675 ptr_a[3] = count_one & 255;
676 ptr_a += 4;
643 } else 677 } else
644 count_def += count_one; 678 count_def += count_one;
645 679
646 if( count_two > 4 || !count_def ) { 680 if (count_two > 4 || !count_def) {
647 mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 ); 681 mem_b = 1 + 1 + 2 + count_two * (19 + 14);
648 ptr_c += mem_b; 682 ptr_c += mem_b;
649 ptr_b[0] = 2; /* Offset 0: packet type 2 */ 683 ptr_b[0] = 2; /* Offset 0: packet type 2 */
650 ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ 684 ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
651 ptr_b[2] = count_two >> 8; 685 ptr_b[2] = count_two >> 8;
652 ptr_b[3] = count_two & 255; 686 ptr_b[3] = count_two & 255;
653 ptr_b += 4; 687 ptr_b += 4;
654 } else 688 } else
655 count_def += count_two; 689 count_def += count_two;
656 690
657 if( count_def ) { 691 if (count_def) {
658 ptr_c[0] = 0; /* Offset 0: packet type 0 */ 692 ptr_c[0] = 0; /* Offset 0: packet type 0 */
659 ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ 693 ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
660 ptr_c[2] = count_def >> 8; 694 ptr_c[2] = count_def >> 8;
661 ptr_c[3] = count_def & 255; 695 ptr_c[3] = count_def & 255;
662 ptr_c += 4; 696 ptr_c += 4;
663 } 697 }
664 698
665 /* For each torrent in this bucket.. */ 699 /* For each torrent in this bucket.. */
666 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { 700 for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
667 /* Address torrents members */ 701 /* Address torrents members */
668 ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset; 702 ot_torrent *torrent = ((ot_torrent *)(torrents_list->data)) + tor_offset;
669 ot_peerlist *peer_list = torrent->peer_list; 703 ot_peerlist *peer_list = torrent->peer_list;
670 ot_peer *peers = (ot_peer*)(peer_list->peers.data); 704 ot_peer *peers = (ot_peer *)(peer_list->peers.data);
671 uint8_t **dst; 705 uint8_t **dst;
672 706
673 /* Determine destination slot */ 707 /* Determine destination slot */
674 count_peers = peer_list->peer_count; 708 count_peers = peer_list->peer_count;
675 switch( count_peers ) { 709 switch (count_peers) {
676 case 0: continue; 710 case 0:
677 case 1: dst = mem_a ? &ptr_a : &ptr_c; break; 711 continue;
678 case 2: dst = mem_b ? &ptr_b : &ptr_c; break; 712 case 1:
679 default: dst = &ptr_c; break; 713 dst = mem_a ? &ptr_a : &ptr_c;
714 break;
715 case 2:
716 dst = mem_b ? &ptr_b : &ptr_c;
717 break;
718 default:
719 dst = &ptr_c;
720 break;
680 } 721 }
681 722
682 /* Copy tail of info_hash, advance pointer */ 723 /* Copy tail of info_hash, advance pointer */
683 memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1); 724 memcpy(*dst, ((uint8_t *)torrent->hash) + 1, sizeof(ot_hash) - 1);
684 *dst += sizeof( ot_hash ) - 1; 725 *dst += sizeof(ot_hash) - 1;
685 726
686 /* Encode peer count */ 727 /* Encode peer count */
687 if( dst == &ptr_c ) 728 if (dst == &ptr_c)
688 while( count_peers ) { 729 while (count_peers) {
689 if( count_peers <= 0x7f ) 730 if (count_peers <= 0x7f)
690 *(*dst)++ = count_peers; 731 *(*dst)++ = count_peers;
691 else 732 else
692 *(*dst)++ = 0x80 | ( count_peers & 0x7f ); 733 *(*dst)++ = 0x80 | (count_peers & 0x7f);
693 count_peers >>= 7; 734 count_peers >>= 7;
694 } 735 }
695 736
696 /* Copy peers */ 737 /* Copy peers */
697 count_peers = peer_list->peer_count; 738 count_peers = peer_list->peer_count;
698 while( count_peers-- ) { 739 while (count_peers--) {
699 memcpy( *dst, peers++, OT_IP_SIZE + 3 ); 740 memcpy(*dst, peers++, OT_IP_SIZE + 3);
700 *dst += OT_IP_SIZE + 3; 741 *dst += OT_IP_SIZE + 3;
701 } 742 }
702 free_peerlist(peer_list); 743 free_peerlist(peer_list);
703 } 744 }
704 745
705 free( torrents_list->data ); 746 free(torrents_list->data);
706 memset( torrents_list, 0, sizeof(*torrents_list ) ); 747 memset(torrents_list, 0, sizeof(*torrents_list));
707unlock_continue: 748 unlock_continue:
708 mutex_bucket_unlock( bucket, 0 ); 749 mutex_bucket_unlock(bucket, 0);
709 750
710 if( ptr ) { 751 if (ptr) {
711 int i; 752 int i;
712 753
713 if( ptr_b > ptr_c ) ptr_c = ptr_b; 754 if (ptr_b > ptr_c)
714 if( ptr_a > ptr_c ) ptr_c = ptr_a; 755 ptr_c = ptr_b;
756 if (ptr_a > ptr_c)
757 ptr_c = ptr_a;
715 mem = ptr_c - ptr; 758 mem = ptr_c - ptr;
716 759
717 for( i=0; i < MAX_PEERS; ++i ) { 760 for (i = 0; i < MAX_PEERS; ++i) {
718 if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) { 761 if (PROXYPEER_ISCONNECTED(g_connections[i].state)) {
719 void *tmp = malloc( mem ); 762 void *tmp = malloc(mem);
720 if( tmp ) { 763 if (tmp) {
721 memcpy( tmp, ptr, mem ); 764 memcpy(tmp, ptr, mem);
722 iob_addbuf_free( &g_connections[i].outdata, tmp, mem ); 765 iob_addbuf_free(&g_connections[i].outdata, tmp, mem);
723 io_wantwrite( g_connections[i].fd ); 766 io_wantwrite(g_connections[i].fd);
724 } 767 }
725 } 768 }
726 } 769 }
727 770
728 free( ptr ); 771 free(ptr);
729 } 772 }
730 usleep( OT_SYNC_SLEEP ); 773 usleep(OT_SYNC_SLEEP);
731 } 774 }
732 } 775 }
733 return 0; 776 return 0;
734} 777}
735 778
736static void livesync_issue_peersync( ) { 779static void livesync_issue_peersync() {
737 socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, 780 socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT);
738 groupip_1, LIVESYNC_PORT); 781 g_peerbuffer_pos = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t);
739 g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
740 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; 782 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
741} 783}
742 784
743void livesync_ticker( ) { 785void livesync_ticker() {
744 /* livesync_issue_peersync sets g_next_packet_time */ 786 /* livesync_issue_peersync sets g_next_packet_time */
745 if( time(NULL) > g_next_packet_time && 787 if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id))
746 g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
747 livesync_issue_peersync(); 788 livesync_issue_peersync();
748} 789}
749 790
750static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) { 791static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer) {
751// unsigned int i; 792 // unsigned int i;
752 793
753 *g_peerbuffer_pos = prefix; 794 *g_peerbuffer_pos = prefix;
754 memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 ); 795 memcpy(g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1);
755 memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 ); 796 memcpy(g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1);
756 797
757#if 0 798#if 0
758 /* Dump info_hash */ 799 /* Dump info_hash */
@@ -767,77 +808,84 @@ static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *pee
767#endif 808#endif
768 g_peerbuffer_pos += sizeof(ot_peer); 809 g_peerbuffer_pos += sizeof(ot_peer);
769 810
770 if( g_peerbuffer_pos >= g_peerbuffer_highwater ) 811 if (g_peerbuffer_pos >= g_peerbuffer_highwater)
771 livesync_issue_peersync(); 812 livesync_issue_peersync();
772} 813}
773 814
774static void process_indata( proxy_peer * peer ) { 815static void process_indata(proxy_peer *peer) {
775 size_t consumed, peers; 816 size_t consumed, peers;
776 uint8_t *data = peer->indata, *hash; 817 uint8_t *data = peer->indata, *hash;
777 uint8_t *dataend = data + peer->indata_length; 818 uint8_t *dataend = data + peer->indata_length;
778 819
779 while( 1 ) { 820 while (1) {
780 /* If we're not inside of a packet, make a new one */ 821 /* If we're not inside of a packet, make a new one */
781 if( !peer->packet_tcount ) { 822 if (!peer->packet_tcount) {
782 /* Ensure the header is complete or postpone processing */ 823 /* Ensure the header is complete or postpone processing */
783 if( data + 4 > dataend ) break; 824 if (data + 4 > dataend)
784 peer->packet_type = data[0]; 825 break;
785 peer->packet_tprefix = data[1]; 826 peer->packet_type = data[0];
786 peer->packet_tcount = data[2] * 256 + data[3]; 827 peer->packet_tprefix = data[1];
787 data += 4; 828 peer->packet_tcount = data[2] * 256 + data[3];
788printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount ); 829 data += 4;
830 printf("type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount);
789 } 831 }
790 832
791 /* Ensure size for a minimal torrent block */ 833 /* Ensure size for a minimal torrent block */
792 if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break; 834 if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend)
835 break;
793 836
794 /* Advance pointer to peer count or peers */ 837 /* Advance pointer to peer count or peers */
795 hash = data; 838 hash = data;
796 data += sizeof(ot_hash) - 1; 839 data += sizeof(ot_hash) - 1;
797 840
798 /* Type 0 has peer count encoded before each peers */ 841 /* Type 0 has peer count encoded before each peers */
799 peers = peer->packet_type; 842 peers = peer->packet_type;
800 if( !peers ) { 843 if (!peers) {
801 int shift = 0; 844 int shift = 0;
802 do peers |= ( 0x7f & *data ) << ( 7 * shift ); 845 do
803 while ( *(data++) & 0x80 && shift++ < 6 ); 846 peers |= (0x7f & *data) << (7 * shift);
847 while (*(data++) & 0x80 && shift++ < 6);
804 } 848 }
805#if 0 849#if 0
806printf( "peers: %zd\n", peers ); 850printf( "peers: %zd\n", peers );
807#endif 851#endif
808 /* Ensure enough data being read to hold all peers */ 852 /* Ensure enough data being read to hold all peers */
809 if( data + (OT_IP_SIZE + 3) * peers > dataend ) { 853 if (data + (OT_IP_SIZE + 3) * peers > dataend) {
810 data = hash; 854 data = hash;
811 break; 855 break;
812 } 856 }
813 while( peers-- ) { 857 while (peers--) {
814 livesync_proxytell( peer->packet_tprefix, hash, data ); 858 livesync_proxytell(peer->packet_tprefix, hash, data);
815 data += OT_IP_SIZE + 3; 859 data += OT_IP_SIZE + 3;
816 } 860 }
817 --peer->packet_tcount; 861 --peer->packet_tcount;
818 } 862 }
819 863
820 consumed = data - peer->indata; 864 consumed = data - peer->indata;
821 memmove( peer->indata, data, peer->indata_length - consumed ); 865 memmove(peer->indata, data, peer->indata_length - consumed);
822 peer->indata_length -= consumed; 866 peer->indata_length -= consumed;
823} 867}
824 868
825static void * livesync_worker( void * args ) { 869static void *livesync_worker(void *args) {
826 (void)args; 870 (void)args;
827 while( 1 ) { 871 while (1) {
828 ot_ip6 in_ip; uint16_t in_port; 872 ot_ip6 in_ip;
829 size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); 873 uint16_t in_port;
874 size_t datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port);
830 875
831 /* Expect at least tracker id and packet type */ 876 /* Expect at least tracker id and packet type */
832 if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) 877 if (datalen <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t)))
833 continue; 878 continue;
834 if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { 879 if (!memcmp(g_inbuffer, &g_tracker_id, sizeof(g_tracker_id))) {
835 /* drop packet coming from ourselves */ 880 /* drop packet coming from ourselves */
836 continue; 881 continue;
837 } 882 }
838 switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { 883 switch (uint32_read_big((char *)g_inbuffer + sizeof(g_tracker_id))) {
839 case OT_SYNC_PEER: 884 case OT_SYNC_PEER4:
840 livesync_handle_peersync( datalen ); 885 livesync_handle_peersync(datalen, OT_PEER_SIZE4);
886 break;
887 case OT_SYNC_PEER6:
888 livesync_handle_peersync(datalen, OT_PEER_SIZE6);
841 break; 889 break;
842 default: 890 default:
843 // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); 891 // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );