summaryrefslogtreecommitdiff
path: root/proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy.c')
-rw-r--r--proxy.c858
1 files changed, 455 insertions, 403 deletions
diff --git a/proxy.c b/proxy.c
index 1f09777..9946240 100644
--- a/proxy.c
+++ b/proxy.c
@@ -4,33 +4,33 @@
4 $Id$ */ 4 $Id$ */
5 5
6/* System */ 6/* System */
7#include <arpa/inet.h>
8#include <ctype.h>
9#include <errno.h>
10#include <pthread.h>
11#include <pwd.h>
12#include <signal.h>
7#include <stdint.h> 13#include <stdint.h>
14#include <stdio.h>
8#include <stdlib.h> 15#include <stdlib.h>
9#include <string.h> 16#include <string.h>
10#include <arpa/inet.h>
11#include <sys/socket.h> 17#include <sys/socket.h>
12#include <unistd.h> 18#include <unistd.h>
13#include <errno.h>
14#include <signal.h>
15#include <stdio.h>
16#include <pwd.h>
17#include <ctype.h>
18#include <pthread.h>
19 19
20/* Libowfat */ 20/* Libowfat */
21#include "socket.h" 21#include "byte.h"
22#include "io.h" 22#include "io.h"
23#include "iob.h" 23#include "iob.h"
24#include "byte.h"
25#include "scan.h"
26#include "ip6.h" 24#include "ip6.h"
27#include "ndelay.h" 25#include "ndelay.h"
26#include "scan.h"
27#include "socket.h"
28 28
29/* Opentracker */ 29/* Opentracker */
30#include "trackerlogic.h"
31#include "ot_vector.h"
32#include "ot_mutex.h" 30#include "ot_mutex.h"
33#include "ot_stats.h" 31#include "ot_stats.h"
32#include "ot_vector.h"
33#include "trackerlogic.h"
34 34
35#ifndef WANT_SYNC_LIVE 35#ifndef WANT_SYNC_LIVE
36#define WANT_SYNC_LIVE 36#define WANT_SYNC_LIVE
@@ -40,28 +40,28 @@
40ot_ip6 g_serverip; 40ot_ip6 g_serverip;
41uint16_t g_serverport = 9009; 41uint16_t g_serverport = 9009;
42uint32_t g_tracker_id; 42uint32_t g_tracker_id;
43char groupip_1[4] = { 224,0,23,5 }; 43char groupip_1[4] = {224, 0, 23, 5};
44int g_self_pipe[2]; 44int g_self_pipe[2];
45 45
46/* If you have more than 10 peers, don't use this proxy 46/* If you have more than 10 peers, don't use this proxy
47 Use 20 slots for 10 peers to have room for 10 incoming connection slots 47 Use 20 slots for 10 peers to have room for 10 incoming connection slots
48 */ 48 */
49#define MAX_PEERS 20 49#define MAX_PEERS 20
50 50
51#define LIVESYNC_INCOMING_BUFFSIZE (256*256) 51#define LIVESYNC_INCOMING_BUFFSIZE (256 * 256)
52#define STREAMSYNC_OUTGOING_BUFFSIZE (256*256) 52#define STREAMSYNC_OUTGOING_BUFFSIZE (256 * 256)
53 53
54#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 54#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
55#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) 55#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash))
56#define LIVESYNC_MAXDELAY 15 /* seconds */ 56#define LIVESYNC_MAXDELAY 15 /* seconds */
57 57
58/* The amount of time a complete sync cycle should take */ 58/* The amount of time a complete sync cycle should take */
59#define OT_SYNC_INTERVAL_MINUTES 2 59#define OT_SYNC_INTERVAL_MINUTES 2
60 60
61/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ 61/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
62#define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) ) 62#define OT_SYNC_SLEEP (((OT_SYNC_INTERVAL_MINUTES) * 60 * 1000000) / (OT_BUCKET_COUNT))
63 63
64enum { OT_SYNC_PEER }; 64enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
65enum { FLAG_SERVERSOCKET = 1 }; 65enum { FLAG_SERVERSOCKET = 1 };
66 66
67/* For incoming packets */ 67/* For incoming packets */
@@ -75,145 +75,153 @@ static uint8_t *g_peerbuffer_pos;
75static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; 75static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
76static ot_time g_next_packet_time; 76static ot_time g_next_packet_time;
77 77
78static void * livesync_worker( void * args ); 78static void *livesync_worker(void *args);
79static void * streamsync_worker( void * args ); 79static void *streamsync_worker(void *args);
80static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ); 80static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer);
81 81
82void exerr( char * message ) { 82void exerr(char *message) {
83 fprintf( stderr, "%s\n", message ); 83 fprintf(stderr, "%s\n", message);
84 exit( 111 ); 84 exit(111);
85} 85}
86 86
87void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) { 87void stats_issue_event(ot_status_event event, PROTO_FLAG proto, uintptr_t event_data) {
88 (void) event; 88 (void)event;
89 (void) proto; 89 (void)proto;
90 (void) event_data; 90 (void)event_data;
91} 91}
92 92
93void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { 93void livesync_bind_mcast(ot_ip6 ip, uint16_t port) {
94 char tmpip[4] = {0,0,0,0}; 94 char tmpip[4] = {0, 0, 0, 0};
95 char *v4ip; 95 char *v4ip;
96 96
97 if( !ip6_isv4mapped(ip)) 97 if (!ip6_isv4mapped(ip))
98 exerr("v6 mcast support not yet available."); 98 exerr("v6 mcast support not yet available.");
99 v4ip = ip+12; 99 v4ip = ip + 12;
100 100
101 if( g_socket_in != -1 ) 101 if (g_socket_in != -1)
102 exerr("Error: Livesync listen ip specified twice."); 102 exerr("Error: Livesync listen ip specified twice.");
103 103
104 if( ( g_socket_in = socket_udp4( )) < 0) 104 if ((g_socket_in = socket_udp4()) < 0)
105 exerr("Error: Cant create live sync incoming socket." ); 105 exerr("Error: Cant create live sync incoming socket.");
106 ndelay_off(g_socket_in); 106 ndelay_off(g_socket_in);
107 107
108 if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) 108 if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1)
109 exerr("Error: Cant bind live sync incoming socket." ); 109 exerr("Error: Cant bind live sync incoming socket.");
110 110
111 if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) ) 111 if (socket_mcjoin4(g_socket_in, groupip_1, v4ip))
112 exerr("Error: Cant make live sync incoming socket join mcast group."); 112 exerr("Error: Cant make live sync incoming socket join mcast group.");
113 113
114 if( ( g_socket_out = socket_udp4()) < 0) 114 if ((g_socket_out = socket_udp4()) < 0)
115 exerr("Error: Cant create live sync outgoing socket." ); 115 exerr("Error: Cant create live sync outgoing socket.");
116 if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 ) 116 if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1)
117 exerr("Error: Cant bind live sync outgoing socket." ); 117 exerr("Error: Cant bind live sync outgoing socket.");
118 118
119 socket_mcttl4(g_socket_out, 1); 119 socket_mcttl4(g_socket_out, 1);
120 socket_mcloop4(g_socket_out, 1); 120 socket_mcloop4(g_socket_out, 1);
121} 121}
122 122
123size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) { 123size_t add_peer_to_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) {
124 int exactmatch; 124 int exactmatch;
125 ot_torrent *torrent; 125 ot_torrent *torrent;
126 ot_peer *peer_dest; 126 ot_peerlist *peer_list;
127 ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); 127 ot_peer *peer_dest;
128 128 ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash);
129 torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); 129 size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size);
130 if( !torrent ) 130
131 torrent = vector_find_or_insert(torrents_list, (void *)hash, sizeof(ot_torrent), compare_size, &exactmatch);
132 if (!torrent)
131 return -1; 133 return -1;
132 134
133 if( !exactmatch ) { 135 if (!exactmatch) {
134 /* Create a new torrent entry, then */ 136 /* Create a new torrent entry, then */
135 memcpy( torrent->hash, hash, sizeof(ot_hash) ); 137 memcpy(torrent->hash, hash, sizeof(ot_hash));
136 138
137 if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { 139 if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) {
138 vector_remove_torrent( torrents_list, torrent ); 140 vector_remove_torrent(torrents_list, torrent);
139 mutex_bucket_unlock_by_hash( hash, 0 ); 141 mutex_bucket_unlock_by_hash(hash, 0);
140 return -1; 142 return -1;
141 } 143 }
142 144
143 byte_zero( torrent->peer_list, sizeof( ot_peerlist ) ); 145 byte_zero(torrent->peer_list6, sizeof(ot_peerlist));
146 byte_zero(torrent->peer_list4, sizeof(ot_peerlist));
144 } 147 }
145 148
149 peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
150
146 /* Check for peer in torrent */ 151 /* Check for peer in torrent */
147 peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); 152 peer_dest = vector_find_or_insert_peer(&(peer_list->peers), peer, peer_size, &exactmatch);
148 if( !peer_dest ) { 153 if (!peer_dest) {
149 mutex_bucket_unlock_by_hash( hash, 0 ); 154 mutex_bucket_unlock_by_hash(hash, 0);
150 return -1; 155 return -1;
151 } 156 }
152 /* Tell peer that it's fresh */ 157 /* Tell peer that it's fresh */
153 OT_PEERTIME( peer ) = 0; 158 OT_PEERTIME(peer, peer_size) = 0;
154 159
155 /* If we hadn't had a match create peer there */ 160 /* If we hadn't had a match create peer there */
156 if( !exactmatch ) { 161 if (!exactmatch) {
157 torrent->peer_list->peer_count++; 162 peer_list->peer_count++;
158 if( OT_PEERFLAG(peer) & PEER_FLAG_SEEDING ) 163 if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING)
159 torrent->peer_list->seed_count++; 164 peer_list->seed_count++;
160 } 165 }
161 memcpy( peer_dest, peer, sizeof(ot_peer) ); 166 memcpy(peer_dest, peer, peer_size);
162 mutex_bucket_unlock_by_hash( hash, 0 ); 167 mutex_bucket_unlock_by_hash(hash, 0);
163 return 0; 168 return 0;
164} 169}
165 170
166size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer ) { 171size_t remove_peer_from_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) {
167 int exactmatch; 172 int exactmatch;
168 ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); 173 ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash);
169 ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); 174 ot_torrent *torrent = binary_search(hash, torrents_list->data, torrents_list->size, sizeof(ot_torrent), OT_HASH_COMPARE_SIZE, &exactmatch);
170 175
171 if( exactmatch ) { 176 if (exactmatch) {
172 ot_peerlist *peer_list = torrent->peer_list; 177 ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
173 switch( vector_remove_peer( &peer_list->peers, peer ) ) { 178 switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) {
174 case 2: peer_list->seed_count--; /* Fall throughs intended */ 179 case 2:
175 case 1: peer_list->peer_count--; /* Fall throughs intended */ 180 peer_list->seed_count--; /* Intentional fallthrough */
176 default: break; 181 case 1:
182 peer_list->peer_count--; /* Intentional fallthrough */
183 default:
184 break;
177 } 185 }
178 } 186 }
179 187
180 mutex_bucket_unlock_by_hash( hash, 0 ); 188 mutex_bucket_unlock_by_hash(hash, 0);
181 return 0; 189 return 0;
182} 190}
183 191
184void free_peerlist( ot_peerlist *peer_list ) { 192void free_peerlist(ot_peerlist *peer_list) {
185 if( peer_list->peers.data ) { 193 if (peer_list->peers.data) {
186 if( OT_PEERLIST_HASBUCKETS( peer_list ) ) { 194 if (OT_PEERLIST_HASBUCKETS(peer_list)) {
187 ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data); 195 ot_vector *bucket_list = (ot_vector *)(peer_list->peers.data);
188 196
189 while( peer_list->peers.size-- ) 197 while (peer_list->peers.size--)
190 free( bucket_list++->data ); 198 free(bucket_list++->data);
191 } 199 }
192 free( peer_list->peers.data ); 200 free(peer_list->peers.data);
193 } 201 }
194 free( peer_list ); 202 free(peer_list);
195} 203}
196 204
197static void livesync_handle_peersync( ssize_t datalen ) { 205static void livesync_handle_peersync(ssize_t datalen, size_t peer_size) {
198 int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); 206 int off = sizeof(g_tracker_id) + sizeof(uint32_t);
199 207
200 fprintf( stderr, "." ); 208 fprintf(stderr, ".");
201 209
202 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { 210 while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= datalen) {
203 ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); 211 ot_peer *peer = (ot_peer *)(g_inbuffer + off + sizeof(ot_hash));
204 ot_hash *hash = (ot_hash*)(g_inbuffer + off); 212 ot_hash *hash = (ot_hash *)(g_inbuffer + off);
205 213
206 if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) 214 if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED)
207 remove_peer_from_torrent_proxy( *hash, peer ); 215 remove_peer_from_torrent_proxy(*hash, peer, peer_size);
208 else 216 else
209 add_peer_to_torrent_proxy( *hash, peer ); 217 add_peer_to_torrent_proxy(*hash, peer, peer_size);
210 218
211 off += sizeof( ot_hash ) + sizeof( ot_peer ); 219 off += sizeof(ot_hash) + peer_size;
212 } 220 }
213} 221}
214 222
215int usage( char *self ) { 223int usage(char *self) {
216 fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self ); 224 fprintf(stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self);
217 return 0; 225 return 0;
218} 226}
219 227
@@ -228,115 +236,115 @@ enum {
228 FLAG_MASK = 0x07 236 FLAG_MASK = 0x07
229}; 237};
230 238
231#define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING) 239#define PROXYPEER_NEEDSCONNECT(flag) ((flag) == FLAG_OUTGOING)
232#define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED) 240#define PROXYPEER_ISCONNECTED(flag) (((flag) & FLAG_MASK) == FLAG_CONNECTED)
233#define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED) 241#define PROXYPEER_SETDISCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_DISCONNECTED)
234#define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING) 242#define PROXYPEER_SETCONNECTING(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTING)
235#define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID) 243#define PROXYPEER_SETWAITTRACKERID(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_WAITTRACKERID)
236#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) 244#define PROXYPEER_SETCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTED)
237 245
238typedef struct { 246typedef struct {
239 int state; /* Whether we want to connect, how far our handshake is, etc. */ 247 int state; /* Whether we want to connect, how far our handshake is, etc. */
240 ot_ip6 ip; /* The peer to connect to */ 248 ot_ip6 ip; /* The peer to connect to */
241 uint16_t port; /* The peers port */ 249 uint16_t port; /* The peers port */
242 uint8_t indata[8192*16]; /* Any data not processed yet */ 250 uint8_t indata[8192 * 16]; /* Any data not processed yet */
243 size_t indata_length; /* Length of unprocessed data */ 251 size_t indata_length; /* Length of unprocessed data */
244 uint32_t tracker_id; /* How the other end greeted */ 252 uint32_t tracker_id; /* How the other end greeted */
245 int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ 253 int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
246 io_batch outdata; /* The iobatch containing our sync data */ 254 io_batch outdata; /* The iobatch containing our sync data */
247 255
248 size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ 256 size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */
249 uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ 257 uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */
250 uint8_t packet_type; /* Type of current packet */ 258 uint8_t packet_type; /* Type of current packet */
251 uint32_t packet_tid; /* Tracker id for current packet */ 259 uint32_t packet_tid; /* Tracker id for current packet */
252 260
253} proxy_peer; 261} proxy_peer;
254static void process_indata( proxy_peer * peer ); 262static void process_indata(proxy_peer *peer);
255 263
256void reset_info_block( proxy_peer * peer ) { 264void reset_info_block(proxy_peer *peer) {
257 peer->indata_length = 0; 265 peer->indata_length = 0;
258 peer->tracker_id = 0; 266 peer->tracker_id = 0;
259 peer->fd = -1; 267 peer->fd = -1;
260 peer->packet_tcount = 0; 268 peer->packet_tcount = 0;
261 iob_reset( &peer->outdata ); 269 iob_reset(&peer->outdata);
262 PROXYPEER_SETDISCONNECTED( peer->state ); 270 PROXYPEER_SETDISCONNECTED(peer->state);
263} 271}
264 272
265/* Number of connections to peers 273/* Number of connections to peers
266 * If a peer's IP is set, we try to reconnect, when the connection drops 274 * If a peer's IP is set, we try to reconnect, when the connection drops
267 * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it 275 * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
268 * Multiple connections to/from the same ip are okay, if tracker_id doesn't match 276 * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
269 * Reconnect attempts occur only twice a minute 277 * Reconnect attempts occur only twice a minute
270*/ 278 */
271static int g_connection_count; 279static int g_connection_count;
272static ot_time g_connection_reconn; 280static ot_time g_connection_reconn;
273static proxy_peer g_connections[MAX_PEERS]; 281static proxy_peer g_connections[MAX_PEERS];
274 282
275static void handle_reconnects( void ) { 283static void handle_reconnects(void) {
276 int i; 284 int i;
277 for( i=0; i<g_connection_count; ++i ) 285 for (i = 0; i < g_connection_count; ++i)
278 if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { 286 if (PROXYPEER_NEEDSCONNECT(g_connections[i].state)) {
279 int64 newfd = socket_tcp6( ); 287 int64 newfd = socket_tcp6();
280 fprintf( stderr, "(Re)connecting to peer..." ); 288 fprintf(stderr, "(Re)connecting to peer...");
281 if( newfd < 0 ) continue; /* No socket for you */ 289 if (newfd < 0)
290 continue; /* No socket for you */
282 io_fd(newfd); 291 io_fd(newfd);
283 if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { 292 if (socket_bind6_reuse(newfd, g_serverip, g_serverport, 0)) {
284 io_close( newfd ); 293 io_close(newfd);
285 continue; 294 continue;
286 } 295 }
287 if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 && 296 if (socket_connect6(newfd, g_connections[i].ip, g_connections[i].port, 0) == -1 && errno != EINPROGRESS && errno != EWOULDBLOCK) {
288 errno != EINPROGRESS && errno != EWOULDBLOCK ) {
289 close(newfd); 297 close(newfd);
290 continue; 298 continue;
291 } 299 }
292 io_wantwrite(newfd); /* So we will be informed when it is connected */ 300 io_wantwrite(newfd); /* So we will be informed when it is connected */
293 io_setcookie(newfd,g_connections+i); 301 io_setcookie(newfd, g_connections + i);
294 302
295 /* Prepare connection info block */ 303 /* Prepare connection info block */
296 reset_info_block( g_connections+i ); 304 reset_info_block(g_connections + i);
297 g_connections[i].fd = newfd; 305 g_connections[i].fd = newfd;
298 PROXYPEER_SETCONNECTING( g_connections[i].state ); 306 PROXYPEER_SETCONNECTING(g_connections[i].state);
299 } 307 }
300 g_connection_reconn = time(NULL) + 30; 308 g_connection_reconn = time(NULL) + 30;
301} 309}
302 310
303/* Handle incoming connection requests, check against whitelist */ 311/* Handle incoming connection requests, check against whitelist */
304static void handle_accept( int64 serversocket ) { 312static void handle_accept(int64 serversocket) {
305 int64 newfd; 313 int64 newfd;
306 ot_ip6 ip; 314 ot_ip6 ip;
307 uint16 port; 315 uint16 port;
308 316
309 while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) { 317 while ((newfd = socket_accept6(serversocket, ip, &port, NULL)) != -1) {
310 318
311 /* XXX some access control */ 319 /* XXX some access control */
312 320
313 /* Put fd into a non-blocking mode */ 321 /* Put fd into a non-blocking mode */
314 io_nonblock( newfd ); 322 io_nonblock(newfd);
315 323
316 if( !io_fd( newfd ) ) 324 if (!io_fd(newfd))
317 io_close( newfd ); 325 io_close(newfd);
318 else { 326 else {
319 /* Find a new home for our incoming connection */ 327 /* Find a new home for our incoming connection */
320 int i; 328 int i;
321 for( i=0; i<MAX_PEERS; ++i ) 329 for (i = 0; i < MAX_PEERS; ++i)
322 if( g_connections[i].state == FLAG_DISCONNECTED ) 330 if (g_connections[i].state == FLAG_DISCONNECTED)
323 break; 331 break;
324 if( i == MAX_PEERS ) { 332 if (i == MAX_PEERS) {
325 fprintf( stderr, "No room for incoming connection." ); 333 fprintf(stderr, "No room for incoming connection.");
326 close( newfd ); 334 close(newfd);
327 continue; 335 continue;
328 } 336 }
329 337
330 /* Prepare connection info block */ 338 /* Prepare connection info block */
331 reset_info_block( g_connections+i ); 339 reset_info_block(g_connections + i);
332 PROXYPEER_SETCONNECTING( g_connections[i].state ); 340 PROXYPEER_SETCONNECTING(g_connections[i].state);
333 g_connections[i].port = port; 341 g_connections[i].port = port;
334 g_connections[i].fd = newfd; 342 g_connections[i].fd = newfd;
335 343
336 io_setcookie( newfd, g_connections + i ); 344 io_setcookie(newfd, g_connections + i);
337 345
338 /* We expect the connecting side to begin with its tracker_id */ 346 /* We expect the connecting side to begin with its tracker_id */
339 io_wantread( newfd ); 347 io_wantread(newfd);
340 } 348 }
341 } 349 }
342 350
@@ -344,117 +352,116 @@ static void handle_accept( int64 serversocket ) {
344} 352}
345 353
346/* New sync data on the stream */ 354/* New sync data on the stream */
347static void handle_read( int64 peersocket ) { 355static void handle_read(int64 peersocket) {
348 int i; 356 int i;
349 int64 datalen; 357 int64 datalen;
350 uint32_t tracker_id; 358 uint32_t tracker_id;
351 proxy_peer *peer = io_getcookie( peersocket ); 359 proxy_peer *peer = io_getcookie(peersocket);
352 360
353 if( !peer ) { 361 if (!peer) {
354 /* Can't happen ;) */ 362 /* Can't happen ;) */
355 io_close( peersocket ); 363 io_close(peersocket);
356 return; 364 return;
357 } 365 }
358 switch( peer->state & FLAG_MASK ) { 366 switch (peer->state & FLAG_MASK) {
359 case FLAG_DISCONNECTED: 367 case FLAG_DISCONNECTED:
360 io_close( peersocket ); 368 io_close(peersocket);
361 break; /* Shouldnt happen */ 369 break; /* Shouldnt happen */
362 case FLAG_CONNECTING: 370 case FLAG_CONNECTING:
363 case FLAG_WAITTRACKERID: 371 case FLAG_WAITTRACKERID:
364 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) 372 /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
365 This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ 373 This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
366 if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) 374 if (io_tryread(peersocket, (void *)&tracker_id, sizeof(tracker_id)) != sizeof(tracker_id))
367 goto close_socket; 375 goto close_socket;
368 376
369 /* See, if we already have a connection to that peer */ 377 /* See, if we already have a connection to that peer */
370 for( i=0; i<MAX_PEERS; ++i ) 378 for (i = 0; i < MAX_PEERS; ++i)
371 if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED && 379 if ((g_connections[i].state & FLAG_MASK) == FLAG_CONNECTED && g_connections[i].tracker_id == tracker_id) {
372 g_connections[i].tracker_id == tracker_id ) { 380 fprintf(stderr, "Peer already connected. Closing connection.\n");
373 fprintf( stderr, "Peer already connected. Closing connection.\n" );
374 goto close_socket; 381 goto close_socket;
375 } 382 }
376 383
377 /* Also no need for soliloquy */ 384 /* Also no need for soliloquy */
378 if( tracker_id == g_tracker_id ) 385 if (tracker_id == g_tracker_id)
379 goto close_socket; 386 goto close_socket;
380 387
381 /* The new connection is good, send our tracker_id on incoming connections */ 388 /* The new connection is good, send our tracker_id on incoming connections */
382 if( peer->state == FLAG_CONNECTING ) 389 if (peer->state == FLAG_CONNECTING)
383 if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) ) 390 if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) != sizeof(g_tracker_id))
384 goto close_socket; 391 goto close_socket;
385 392
386 peer->tracker_id = tracker_id; 393 peer->tracker_id = tracker_id;
387 PROXYPEER_SETCONNECTED( peer->state ); 394 PROXYPEER_SETCONNECTED(peer->state);
388 395
389 if( peer->state & FLAG_OUTGOING ) 396 if (peer->state & FLAG_OUTGOING)
390 fprintf( stderr, "succeeded.\n" ); 397 fprintf(stderr, "succeeded.\n");
391 else 398 else
392 fprintf( stderr, "Incoming connection successful.\n" ); 399 fprintf(stderr, "Incoming connection successful.\n");
393 400
394 break; 401 break;
395close_socket: 402 close_socket:
396 fprintf( stderr, "Handshake incomplete, closing socket\n" ); 403 fprintf(stderr, "Handshake incomplete, closing socket\n");
397 io_close( peersocket ); 404 io_close(peersocket);
398 reset_info_block( peer ); 405 reset_info_block(peer);
399 break; 406 break;
400 case FLAG_CONNECTED: 407 case FLAG_CONNECTED:
401 /* Here we acutally expect data from peer 408 /* Here we acutally expect data from peer
402 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ 409 indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
403 datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); 410 datalen = io_tryread(peersocket, (void *)(peer->indata + peer->indata_length), sizeof(peer->indata) - peer->indata_length);
404 if( !datalen || datalen < -1 ) { 411 if (!datalen || datalen < -1) {
405 fprintf( stderr, "Connection closed by remote peer.\n" ); 412 fprintf(stderr, "Connection closed by remote peer.\n");
406 io_close( peersocket ); 413 io_close(peersocket);
407 reset_info_block( peer ); 414 reset_info_block(peer);
408 } else if( datalen > 0 ) { 415 } else if (datalen > 0) {
409 peer->indata_length += datalen; 416 peer->indata_length += datalen;
410 process_indata( peer ); 417 process_indata(peer);
411 } 418 }
412 break; 419 break;
413 } 420 }
414} 421}
415 422
416/* Can write new sync data to the stream */ 423/* Can write new sync data to the stream */
417static void handle_write( int64 peersocket ) { 424static void handle_write(int64 peersocket) {
418 proxy_peer *peer = io_getcookie( peersocket ); 425 proxy_peer *peer = io_getcookie(peersocket);
419 426
420 if( !peer ) { 427 if (!peer) {
421 /* Can't happen ;) */ 428 /* Can't happen ;) */
422 io_close( peersocket ); 429 io_close(peersocket);
423 return; 430 return;
424 } 431 }
425 432
426 switch( peer->state & FLAG_MASK ) { 433 switch (peer->state & FLAG_MASK) {
427 case FLAG_DISCONNECTED: 434 case FLAG_DISCONNECTED:
428 default: /* Should not happen */ 435 default: /* Should not happen */
429 io_close( peersocket ); 436 io_close(peersocket);
430 break; 437 break;
431 case FLAG_CONNECTING: 438 case FLAG_CONNECTING:
432 /* Ensure that the connection is established and handle connection error */ 439 /* Ensure that the connection is established and handle connection error */
433 if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { 440 if (peer->state & FLAG_OUTGOING && !socket_connected(peersocket)) {
434 fprintf( stderr, "failed\n" ); 441 fprintf(stderr, "failed\n");
435 reset_info_block( peer ); 442 reset_info_block(peer);
436 io_close( peersocket ); 443 io_close(peersocket);
437 break; 444 break;
438 } 445 }
439 446
440 if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) { 447 if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) == sizeof(g_tracker_id)) {
441 PROXYPEER_SETWAITTRACKERID( peer->state ); 448 PROXYPEER_SETWAITTRACKERID(peer->state);
442 io_dontwantwrite( peersocket ); 449 io_dontwantwrite(peersocket);
443 io_wantread( peersocket ); 450 io_wantread(peersocket);
444 } else { 451 } else {
445 fprintf( stderr, "Handshake incomplete, closing socket\n" ); 452 fprintf(stderr, "Handshake incomplete, closing socket\n");
446 io_close( peersocket ); 453 io_close(peersocket);
447 reset_info_block( peer ); 454 reset_info_block(peer);
448 } 455 }
449 break; 456 break;
450 case FLAG_CONNECTED: 457 case FLAG_CONNECTED:
451 switch( iob_send( peersocket, &peer->outdata ) ) { 458 switch (iob_send(peersocket, &peer->outdata)) {
452 case 0: /* all data sent */ 459 case 0: /* all data sent */
453 io_dontwantwrite( peersocket ); 460 io_dontwantwrite(peersocket);
454 break; 461 break;
455 case -3: /* an error occured */ 462 case -3: /* an error occured */
456 io_close( peersocket ); 463 io_close(peersocket);
457 reset_info_block( peer ); 464 reset_info_block(peer);
458 break; 465 break;
459 default: /* Normal operation or eagain */ 466 default: /* Normal operation or eagain */
460 break; 467 break;
@@ -469,286 +476,324 @@ static void server_mainloop() {
469 int64 sock; 476 int64 sock;
470 477
471 /* inlined livesync_init() */ 478 /* inlined livesync_init() */
472 memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); 479 memset(g_peerbuffer_start, 0, sizeof(g_peerbuffer_start));
473 g_peerbuffer_pos = g_peerbuffer_start; 480 g_peerbuffer_pos = g_peerbuffer_start;
474 memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); 481 memcpy(g_peerbuffer_pos, &g_tracker_id, sizeof(g_tracker_id));
475 uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); 482 uint32_pack_big((char *)g_peerbuffer_pos + sizeof(g_tracker_id), OT_SYNC_PEER);
476 g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); 483 g_peerbuffer_pos += sizeof(g_tracker_id) + sizeof(uint32_t);
477 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; 484 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
478 485
479 while(1) { 486 while (1) {
480 /* See, if we need to connect to anyone */ 487 /* See if we need to connect to anyone */
481 if( time(NULL) > g_connection_reconn ) 488 if (time(NULL) > g_connection_reconn)
482 handle_reconnects( ); 489 handle_reconnects();
483 490
484 /* Wait for io events until next approx reconn check time */ 491 /* Wait for io events until next approx reconn check time */
485 io_waituntil2( 30*1000 ); 492 io_waituntil2(30 * 1000);
486 493
487 /* Loop over readable sockets */ 494 /* Loop over readable sockets */
488 while( ( sock = io_canread( ) ) != -1 ) { 495 while ((sock = io_canread()) != -1) {
489 const void *cookie = io_getcookie( sock ); 496 const void *cookie = io_getcookie(sock);
490 if( (uintptr_t)cookie == FLAG_SERVERSOCKET ) 497 if ((uintptr_t)cookie == FLAG_SERVERSOCKET)
491 handle_accept( sock ); 498 handle_accept(sock);
492 else 499 else
493 handle_read( sock ); 500 handle_read(sock);
494 } 501 }
495 502
496 /* Loop over writable sockets */ 503 /* Loop over writable sockets */
497 while( ( sock = io_canwrite( ) ) != -1 ) 504 while ((sock = io_canwrite()) != -1)
498 handle_write( sock ); 505 handle_write(sock);
499 506
500 livesync_ticker( ); 507 livesync_ticker();
501 } 508 }
502} 509}
503 510
504static void panic( const char *routine ) { 511static void panic(const char *routine) {
505 fprintf( stderr, "%s: %s\n", routine, strerror(errno) ); 512 fprintf(stderr, "%s: %s\n", routine, strerror(errno));
506 exit( 111 ); 513 exit(111);
507} 514}
508 515
509static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) { 516static int64_t ot_try_bind(ot_ip6 ip, uint16_t port) {
510 int64 sock = socket_tcp6( ); 517 int64 sock = socket_tcp6();
511 518
512 if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 ) 519 if (socket_bind6_reuse(sock, ip, port, 0) == -1)
513 panic( "socket_bind6_reuse" ); 520 panic("socket_bind6_reuse");
514 521
515 if( socket_listen( sock, SOMAXCONN) == -1 ) 522 if (socket_listen(sock, SOMAXCONN) == -1)
516 panic( "socket_listen" ); 523 panic("socket_listen");
517 524
518 if( !io_fd( sock ) ) 525 if (!io_fd(sock))
519 panic( "io_fd" ); 526 panic("io_fd");
520 527
521 io_setcookie( sock, (void*)FLAG_SERVERSOCKET ); 528 io_setcookie(sock, (void *)FLAG_SERVERSOCKET);
522 io_wantread( sock ); 529 io_wantread(sock);
523 return sock; 530 return sock;
524} 531}
525 532
526 533static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) {
527static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) {
528 const char *s = src; 534 const char *s = src;
529 int off, bracket = 0; 535 int off, bracket = 0;
530 while( isspace(*s) ) ++s; 536 while (isspace(*s))
531 if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */ 537 ++s;
532 if( !(off = scan_ip6( s, ip ) ) ) 538 if (*s == '[')
539 ++s, ++bracket; /* for v6 style notation */
540 if (!(off = scan_ip6(s, ip)))
533 return 0; 541 return 0;
534 s += off; 542 s += off;
535 if( *s == 0 || isspace(*s)) return s-src; 543 if (*s == 0 || isspace(*s))
536 if( *s == ']' && bracket ) ++s; 544 return s - src;
537 if( !ip6_isv4mapped(ip)){ 545 if (*s == ']' && bracket)
538 if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0; 546 ++s;
547 if (!ip6_isv4mapped(ip)) {
548 if ((bracket && *(s) != ':') || (*(s) != '.'))
549 return 0;
539 s++; 550 s++;
540 } else { 551 } else {
541 if( *(s++) != ':' ) return 0; 552 if (*(s++) != ':')
553 return 0;
542 } 554 }
543 if( !(off = scan_ushort (s, port ) ) ) 555 if (!(off = scan_ushort(s, port)))
544 return 0; 556 return 0;
545 return off+s-src; 557 return off + s - src;
546} 558}
547 559
548int main( int argc, char **argv ) { 560int main(int argc, char **argv) {
549 static pthread_t sync_in_thread_id; 561 static pthread_t sync_in_thread_id;
550 static pthread_t sync_out_thread_id; 562 static pthread_t sync_out_thread_id;
551 ot_ip6 serverip; 563 ot_ip6 serverip;
552 uint16_t tmpport; 564 uint16_t tmpport;
553 int scanon = 1, lbound = 0, sbound = 0; 565 int scanon = 1, lbound = 0, sbound = 0;
554 566
555 srandom( time(NULL) ); 567 srandom(time(NULL));
568#ifdef WANT_ARC4RANDOM
569 g_tracker_id = arc4random();
570#else
556 g_tracker_id = random(); 571 g_tracker_id = random();
557 noipv6=1; 572#endif
558 573
559 while( scanon ) { 574 while (scanon) {
560 switch( getopt( argc, argv, ":l:c:L:h" ) ) { 575 switch (getopt(argc, argv, ":l:c:L:h")) {
561 case -1: scanon = 0; break; 576 case -1:
577 scanon = 0;
578 break;
562 case 'l': 579 case 'l':
563 tmpport = 0; 580 tmpport = 0;
564 if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } 581 if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
565 ot_try_bind( serverip, tmpport ); 582 usage(argv[0]);
583 exit(1);
584 }
585 ot_try_bind(serverip, tmpport);
566 ++sbound; 586 ++sbound;
567 break; 587 break;
568 case 'c': 588 case 'c':
569 if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" ); 589 if (g_connection_count > MAX_PEERS / 2)
590 exerr("Connection limit exceeded.\n");
570 tmpport = 0; 591 tmpport = 0;
571 if( !scan_ip6_port( optarg, 592 if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) {
572 g_connections[g_connection_count].ip, 593 usage(argv[0]);
573 &g_connections[g_connection_count].port ) || 594 exit(1);
574 !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); } 595 }
575 g_connections[g_connection_count++].state = FLAG_OUTGOING; 596 g_connections[g_connection_count++].state = FLAG_OUTGOING;
576 break; 597 break;
577 case 'L': 598 case 'L':
578 tmpport = 9696; 599 tmpport = 9696;
579 if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } 600 if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
580 livesync_bind_mcast( serverip, tmpport); ++lbound; break; 601 usage(argv[0]);
602 exit(1);
603 }
604 livesync_bind_mcast(serverip, tmpport);
605 ++lbound;
606 break;
581 default: 607 default:
582 case '?': usage( argv[0] ); exit( 1 ); 608 case '?':
609 usage(argv[0]);
610 exit(1);
583 } 611 }
584 } 612 }
585 613
586 if( !lbound ) exerr( "No livesync port bound." ); 614 if (!lbound)
587 if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." ); 615 exerr("No livesync port bound.");
588 pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); 616 if (!g_connection_count && !sbound)
589 pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); 617 exerr("No streamsync port bound.");
618 pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL);
619 pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL);
590 620
591 server_mainloop(); 621 server_mainloop();
592 return 0; 622 return 0;
593} 623}
594 624
595static void * streamsync_worker( void * args ) { 625static void *streamsync_worker(void *args) {
596 (void)args; 626 (void)args;
597 while( 1 ) { 627 while (1) {
598 int bucket; 628 int bucket;
599 /* For each bucket... */ 629 /* For each bucket... */
600 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { 630 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
601 /* Get exclusive access to that bucket */ 631 /* Get exclusive access to that bucket */
602 ot_vector *torrents_list = mutex_bucket_lock( bucket ); 632 ot_vector *torrents_list = mutex_bucket_lock(bucket);
603 size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; 633 size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
604 size_t mem, mem_a = 0, mem_b = 0; 634 size_t mem, mem_a = 0, mem_b = 0;
605 uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c; 635 uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c;
606 636
607 if( !torrents_list->size ) goto unlock_continue; 637 if (!torrents_list->size)
638 goto unlock_continue;
608 639
609 /* For each torrent in this bucket.. */ 640 /* For each torrent in this bucket.. */
610 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { 641 for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
611 /* Address torrents members */ 642 /* Address torrents members */
612 ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; 643 ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list;
613 switch( peer_list->peer_count ) { 644 switch (peer_list->peer_count) {
614 case 2: count_two++; break; 645 case 2:
615 case 1: count_one++; break; 646 count_two++;
616 case 0: break; 647 break;
617 default: count_def++; 648 case 1:
618 count_peers += peer_list->peer_count; 649 count_one++;
650 break;
651 case 0:
652 break;
653 default:
654 count_def++;
655 count_peers += peer_list->peer_count;
619 } 656 }
620 } 657 }
621 658
622 /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ 659 /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
623 mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) + 660 mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7;
624 ( count_one + 2 * count_two + count_peers ) * 7; 661
625 662 fprintf(stderr, "Mem: %zd\n", mem);
626 fprintf( stderr, "Mem: %zd\n", mem ); 663
627 664 ptr = ptr_a = ptr_b = ptr_c = malloc(mem);
628 ptr = ptr_a = ptr_b = ptr_c = malloc( mem ); 665 if (!ptr)
629 if( !ptr ) goto unlock_continue; 666 goto unlock_continue;
630 667
631 if( count_one > 4 || !count_def ) { 668 if (count_one > 4 || !count_def) {
632 mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 ); 669 mem_a = 1 + 1 + 2 + count_one * (19 + 7);
633 ptr_b += mem_a; ptr_c += mem_a; 670 ptr_b += mem_a;
634 ptr_a[0] = 1; /* Offset 0: packet type 1 */ 671 ptr_c += mem_a;
635 ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ 672 ptr_a[0] = 1; /* Offset 0: packet type 1 */
636 ptr_a[2] = count_one >> 8; 673 ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
637 ptr_a[3] = count_one & 255; 674 ptr_a[2] = count_one >> 8;
638 ptr_a += 4; 675 ptr_a[3] = count_one & 255;
676 ptr_a += 4;
639 } else 677 } else
640 count_def += count_one; 678 count_def += count_one;
641 679
642 if( count_two > 4 || !count_def ) { 680 if (count_two > 4 || !count_def) {
643 mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 ); 681 mem_b = 1 + 1 + 2 + count_two * (19 + 14);
644 ptr_c += mem_b; 682 ptr_c += mem_b;
645 ptr_b[0] = 2; /* Offset 0: packet type 2 */ 683 ptr_b[0] = 2; /* Offset 0: packet type 2 */
646 ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ 684 ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
647 ptr_b[2] = count_two >> 8; 685 ptr_b[2] = count_two >> 8;
648 ptr_b[3] = count_two & 255; 686 ptr_b[3] = count_two & 255;
649 ptr_b += 4; 687 ptr_b += 4;
650 } else 688 } else
651 count_def += count_two; 689 count_def += count_two;
652 690
653 if( count_def ) { 691 if (count_def) {
654 ptr_c[0] = 0; /* Offset 0: packet type 0 */ 692 ptr_c[0] = 0; /* Offset 0: packet type 0 */
655 ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ 693 ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
656 ptr_c[2] = count_def >> 8; 694 ptr_c[2] = count_def >> 8;
657 ptr_c[3] = count_def & 255; 695 ptr_c[3] = count_def & 255;
658 ptr_c += 4; 696 ptr_c += 4;
659 } 697 }
660 698
661 /* For each torrent in this bucket.. */ 699 /* For each torrent in this bucket.. */
662 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { 700 for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
663 /* Address torrents members */ 701 /* Address torrents members */
664 ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset; 702 ot_torrent *torrent = ((ot_torrent *)(torrents_list->data)) + tor_offset;
665 ot_peerlist *peer_list = torrent->peer_list; 703 ot_peerlist *peer_list = torrent->peer_list;
666 ot_peer *peers = (ot_peer*)(peer_list->peers.data); 704 ot_peer *peers = (ot_peer *)(peer_list->peers.data);
667 uint8_t **dst; 705 uint8_t **dst;
668 706
669 /* Determine destination slot */ 707 /* Determine destination slot */
670 count_peers = peer_list->peer_count; 708 count_peers = peer_list->peer_count;
671 switch( count_peers ) { 709 switch (count_peers) {
672 case 0: continue; 710 case 0:
673 case 1: dst = mem_a ? &ptr_a : &ptr_c; break; 711 continue;
674 case 2: dst = mem_b ? &ptr_b : &ptr_c; break; 712 case 1:
675 default: dst = &ptr_c; break; 713 dst = mem_a ? &ptr_a : &ptr_c;
714 break;
715 case 2:
716 dst = mem_b ? &ptr_b : &ptr_c;
717 break;
718 default:
719 dst = &ptr_c;
720 break;
676 } 721 }
677 722
678 /* Copy tail of info_hash, advance pointer */ 723 /* Copy tail of info_hash, advance pointer */
679 memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1); 724 memcpy(*dst, ((uint8_t *)torrent->hash) + 1, sizeof(ot_hash) - 1);
680 *dst += sizeof( ot_hash ) - 1; 725 *dst += sizeof(ot_hash) - 1;
681 726
682 /* Encode peer count */ 727 /* Encode peer count */
683 if( dst == &ptr_c ) 728 if (dst == &ptr_c)
684 while( count_peers ) { 729 while (count_peers) {
685 if( count_peers <= 0x7f ) 730 if (count_peers <= 0x7f)
686 *(*dst)++ = count_peers; 731 *(*dst)++ = count_peers;
687 else 732 else
688 *(*dst)++ = 0x80 | ( count_peers & 0x7f ); 733 *(*dst)++ = 0x80 | (count_peers & 0x7f);
689 count_peers >>= 7; 734 count_peers >>= 7;
690 } 735 }
691 736
692 /* Copy peers */ 737 /* Copy peers */
693 count_peers = peer_list->peer_count; 738 count_peers = peer_list->peer_count;
694 while( count_peers-- ) { 739 while (count_peers--) {
695 memcpy( *dst, peers++, OT_IP_SIZE + 3 ); 740 memcpy(*dst, peers++, OT_IP_SIZE + 3);
696 *dst += OT_IP_SIZE + 3; 741 *dst += OT_IP_SIZE + 3;
697 } 742 }
698 free_peerlist(peer_list); 743 free_peerlist(peer_list);
699 } 744 }
700 745
701 free( torrents_list->data ); 746 free(torrents_list->data);
702 memset( torrents_list, 0, sizeof(*torrents_list ) ); 747 memset(torrents_list, 0, sizeof(*torrents_list));
703unlock_continue: 748 unlock_continue:
704 mutex_bucket_unlock( bucket, 0 ); 749 mutex_bucket_unlock(bucket, 0);
705 750
706 if( ptr ) { 751 if (ptr) {
707 int i; 752 int i;
708 753
709 if( ptr_b > ptr_c ) ptr_c = ptr_b; 754 if (ptr_b > ptr_c)
710 if( ptr_a > ptr_c ) ptr_c = ptr_a; 755 ptr_c = ptr_b;
756 if (ptr_a > ptr_c)
757 ptr_c = ptr_a;
711 mem = ptr_c - ptr; 758 mem = ptr_c - ptr;
712 759
713 for( i=0; i < MAX_PEERS; ++i ) { 760 for (i = 0; i < MAX_PEERS; ++i) {
714 if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) { 761 if (PROXYPEER_ISCONNECTED(g_connections[i].state)) {
715 void *tmp = malloc( mem ); 762 void *tmp = malloc(mem);
716 if( tmp ) { 763 if (tmp) {
717 memcpy( tmp, ptr, mem ); 764 memcpy(tmp, ptr, mem);
718 iob_addbuf_free( &g_connections[i].outdata, tmp, mem ); 765 iob_addbuf_free(&g_connections[i].outdata, tmp, mem);
719 io_wantwrite( g_connections[i].fd ); 766 io_wantwrite(g_connections[i].fd);
720 } 767 }
721 } 768 }
722 } 769 }
723 770
724 free( ptr ); 771 free(ptr);
725 } 772 }
726 usleep( OT_SYNC_SLEEP ); 773 usleep(OT_SYNC_SLEEP);
727 } 774 }
728 } 775 }
729 return 0; 776 return 0;
730} 777}
731 778
732static void livesync_issue_peersync( ) { 779static void livesync_issue_peersync() {
733 socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, 780 socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT);
734 groupip_1, LIVESYNC_PORT); 781 g_peerbuffer_pos = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t);
735 g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
736 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; 782 g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
737} 783}
738 784
739void livesync_ticker( ) { 785void livesync_ticker() {
740 /* livesync_issue_peersync sets g_next_packet_time */ 786 /* livesync_issue_peersync sets g_next_packet_time */
741 if( time(NULL) > g_next_packet_time && 787 if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id))
742 g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
743 livesync_issue_peersync(); 788 livesync_issue_peersync();
744} 789}
745 790
746static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) { 791static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer) {
747// unsigned int i; 792 // unsigned int i;
748 793
749 *g_peerbuffer_pos = prefix; 794 *g_peerbuffer_pos = prefix;
750 memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 ); 795 memcpy(g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1);
751 memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 ); 796 memcpy(g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1);
752 797
753#if 0 798#if 0
754 /* Dump info_hash */ 799 /* Dump info_hash */
@@ -763,77 +808,84 @@ static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *pee
763#endif 808#endif
764 g_peerbuffer_pos += sizeof(ot_peer); 809 g_peerbuffer_pos += sizeof(ot_peer);
765 810
766 if( g_peerbuffer_pos >= g_peerbuffer_highwater ) 811 if (g_peerbuffer_pos >= g_peerbuffer_highwater)
767 livesync_issue_peersync(); 812 livesync_issue_peersync();
768} 813}
769 814
770static void process_indata( proxy_peer * peer ) { 815static void process_indata(proxy_peer *peer) {
771 size_t consumed, peers; 816 size_t consumed, peers;
772 uint8_t *data = peer->indata, *hash; 817 uint8_t *data = peer->indata, *hash;
773 uint8_t *dataend = data + peer->indata_length; 818 uint8_t *dataend = data + peer->indata_length;
774 819
775 while( 1 ) { 820 while (1) {
776 /* If we're not inside of a packet, make a new one */ 821 /* If we're not inside of a packet, make a new one */
777 if( !peer->packet_tcount ) { 822 if (!peer->packet_tcount) {
778 /* Ensure the header is complete or postpone processing */ 823 /* Ensure the header is complete or postpone processing */
779 if( data + 4 > dataend ) break; 824 if (data + 4 > dataend)
780 peer->packet_type = data[0]; 825 break;
781 peer->packet_tprefix = data[1]; 826 peer->packet_type = data[0];
782 peer->packet_tcount = data[2] * 256 + data[3]; 827 peer->packet_tprefix = data[1];
783 data += 4; 828 peer->packet_tcount = data[2] * 256 + data[3];
784printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount ); 829 data += 4;
830 printf("type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount);
785 } 831 }
786 832
787 /* Ensure size for a minimal torrent block */ 833 /* Ensure size for a minimal torrent block */
788 if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break; 834 if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend)
835 break;
789 836
790 /* Advance pointer to peer count or peers */ 837 /* Advance pointer to peer count or peers */
791 hash = data; 838 hash = data;
792 data += sizeof(ot_hash) - 1; 839 data += sizeof(ot_hash) - 1;
793 840
794 /* Type 0 has peer count encoded before each peers */ 841 /* Type 0 has peer count encoded before each peers */
795 peers = peer->packet_type; 842 peers = peer->packet_type;
796 if( !peers ) { 843 if (!peers) {
797 int shift = 0; 844 int shift = 0;
798 do peers |= ( 0x7f & *data ) << ( 7 * shift ); 845 do
799 while ( *(data++) & 0x80 && shift++ < 6 ); 846 peers |= (0x7f & *data) << (7 * shift);
847 while (*(data++) & 0x80 && shift++ < 6);
800 } 848 }
801#if 0 849#if 0
802printf( "peers: %zd\n", peers ); 850printf( "peers: %zd\n", peers );
803#endif 851#endif
804 /* Ensure enough data being read to hold all peers */ 852 /* Ensure enough data being read to hold all peers */
805 if( data + (OT_IP_SIZE + 3) * peers > dataend ) { 853 if (data + (OT_IP_SIZE + 3) * peers > dataend) {
806 data = hash; 854 data = hash;
807 break; 855 break;
808 } 856 }
809 while( peers-- ) { 857 while (peers--) {
810 livesync_proxytell( peer->packet_tprefix, hash, data ); 858 livesync_proxytell(peer->packet_tprefix, hash, data);
811 data += OT_IP_SIZE + 3; 859 data += OT_IP_SIZE + 3;
812 } 860 }
813 --peer->packet_tcount; 861 --peer->packet_tcount;
814 } 862 }
815 863
816 consumed = data - peer->indata; 864 consumed = data - peer->indata;
817 memmove( peer->indata, data, peer->indata_length - consumed ); 865 memmove(peer->indata, data, peer->indata_length - consumed);
818 peer->indata_length -= consumed; 866 peer->indata_length -= consumed;
819} 867}
820 868
821static void * livesync_worker( void * args ) { 869static void *livesync_worker(void *args) {
822 (void)args; 870 (void)args;
823 while( 1 ) { 871 while (1) {
824 ot_ip6 in_ip; uint16_t in_port; 872 ot_ip6 in_ip;
825 size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); 873 uint16_t in_port;
874 size_t datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port);
826 875
827 /* Expect at least tracker id and packet type */ 876 /* Expect at least tracker id and packet type */
828 if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) 877 if (datalen <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t)))
829 continue; 878 continue;
830 if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { 879 if (!memcmp(g_inbuffer, &g_tracker_id, sizeof(g_tracker_id))) {
831 /* drop packet coming from ourselves */ 880 /* drop packet coming from ourselves */
832 continue; 881 continue;
833 } 882 }
834 switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { 883 switch (uint32_read_big((char *)g_inbuffer + sizeof(g_tracker_id))) {
835 case OT_SYNC_PEER: 884 case OT_SYNC_PEER4:
836 livesync_handle_peersync( datalen ); 885 livesync_handle_peersync(datalen, OT_PEER_SIZE4);
886 break;
887 case OT_SYNC_PEER6:
888 livesync_handle_peersync(datalen, OT_PEER_SIZE6);
837 break; 889 break;
838 default: 890 default:
839 // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); 891 // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );