/* This software was written by Dirk Engling It is considered beerware. Prost. Skol. Cheers or whatever. $Id$ */ /* System */ #include #include #include #include #include #include #include #include #include #include #include #include /* Libowfat */ #include "byte.h" #include "io.h" #include "iob.h" #include "ip6.h" #include "ndelay.h" #include "scan.h" #include "socket.h" /* Opentracker */ #include "ot_mutex.h" #include "ot_stats.h" #include "ot_vector.h" #include "trackerlogic.h" #ifndef WANT_SYNC_LIVE #define WANT_SYNC_LIVE #endif #include "ot_livesync.h" ot_ip6 g_serverip; uint16_t g_serverport = 9009; uint32_t g_tracker_id; char groupip_1[4] = {224, 0, 23, 5}; int g_self_pipe[2]; /* If you have more than 10 peers, don't use this proxy Use 20 slots for 10 peers to have room for 10 incoming connection slots */ #define MAX_PEERS 20 #define LIVESYNC_INCOMING_BUFFSIZE (256 * 256) #define STREAMSYNC_OUTGOING_BUFFSIZE (256 * 256) #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash)) #define LIVESYNC_MAXDELAY 15 /* seconds */ /* The amount of time a complete sync cycle should take */ #define OT_SYNC_INTERVAL_MINUTES 2 /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ #define OT_SYNC_SLEEP (((OT_SYNC_INTERVAL_MINUTES) * 60 * 1000000) / (OT_BUCKET_COUNT)) enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; enum { FLAG_SERVERSOCKET = 1 }; /* For incoming packets */ static int64 g_socket_in = -1; static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; /* For outgoing packets */ static int64 g_socket_out = -1; static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; static uint8_t *g_peerbuffer_pos; static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; static ot_time g_next_packet_time; static void *livesync_worker(void *args); static void *streamsync_worker(void *args); static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer); void exerr(char *message) { fprintf(stderr, "%s\n", message); exit(111); } void stats_issue_event(ot_status_event event, PROTO_FLAG proto, uintptr_t event_data) { (void)event; (void)proto; (void)event_data; } void livesync_bind_mcast(ot_ip6 ip, uint16_t port) { char tmpip[4] = {0, 0, 0, 0}; char *v4ip; if (!ip6_isv4mapped(ip)) exerr("v6 mcast support not yet available."); v4ip = ip + 12; if (g_socket_in != -1) exerr("Error: Livesync listen ip specified twice."); if ((g_socket_in = socket_udp4()) < 0) exerr("Error: Cant create live sync incoming socket."); ndelay_off(g_socket_in); if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1) exerr("Error: Cant bind live sync incoming socket."); if (socket_mcjoin4(g_socket_in, groupip_1, v4ip)) exerr("Error: Cant make live sync incoming socket join mcast group."); if ((g_socket_out = socket_udp4()) < 0) exerr("Error: Cant create live sync outgoing socket."); if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1) exerr("Error: Cant bind live sync outgoing socket."); socket_mcttl4(g_socket_out, 1); socket_mcloop4(g_socket_out, 1); } size_t add_peer_to_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) { int exactmatch; ot_torrent *torrent; ot_peerlist *peer_list; ot_peer *peer_dest; ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size); torrent = vector_find_or_insert(torrents_list, (void *)hash, sizeof(ot_torrent), compare_size, &exactmatch); if (!torrent) return -1; if (!exactmatch) { /* Create a new torrent entry, then */ memcpy(torrent->hash, hash, sizeof(ot_hash)); if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) { vector_remove_torrent(torrents_list, torrent); mutex_bucket_unlock_by_hash(hash, 0); return -1; } byte_zero(torrent->peer_list6, sizeof(ot_peerlist)); byte_zero(torrent->peer_list4, sizeof(ot_peerlist)); } peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; /* Check for peer in torrent */ peer_dest = vector_find_or_insert_peer(&(peer_list->peers), peer, peer_size, &exactmatch); if (!peer_dest) { mutex_bucket_unlock_by_hash(hash, 0); return -1; } /* Tell peer that it's fresh */ OT_PEERTIME(peer, peer_size) = 0; /* If we hadn't had a match create peer there */ if (!exactmatch) { peer_list->peer_count++; if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING) peer_list->seed_count++; } memcpy(peer_dest, peer, peer_size); mutex_bucket_unlock_by_hash(hash, 0); return 0; } size_t remove_peer_from_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) { int exactmatch; ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); ot_torrent *torrent = binary_search(hash, torrents_list->data, torrents_list->size, sizeof(ot_torrent), OT_HASH_COMPARE_SIZE, &exactmatch); if (exactmatch) { ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) { case 2: peer_list->seed_count--; /* Intentional fallthrough */ case 1: peer_list->peer_count--; /* Intentional fallthrough */ default: break; } } mutex_bucket_unlock_by_hash(hash, 0); return 0; } void free_peerlist(ot_peerlist *peer_list) { if (peer_list->peers.data) { if (OT_PEERLIST_HASBUCKETS(peer_list)) { ot_vector *bucket_list = (ot_vector *)(peer_list->peers.data); while (peer_list->peers.size--) free(bucket_list++->data); } free(peer_list->peers.data); } free(peer_list); } static void livesync_handle_peersync(ssize_t datalen, size_t peer_size) { int off = sizeof(g_tracker_id) + sizeof(uint32_t); fprintf(stderr, "."); while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= datalen) { ot_peer *peer = (ot_peer *)(g_inbuffer + off + sizeof(ot_hash)); ot_hash *hash = (ot_hash *)(g_inbuffer + off); if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED) remove_peer_from_torrent_proxy(*hash, peer, peer_size); else add_peer_to_torrent_proxy(*hash, peer, peer_size); off += sizeof(ot_hash) + peer_size; } } int usage(char *self) { fprintf(stderr, "Usage: %s -L -l : -c :\n", self); return 0; } enum { FLAG_OUTGOING = 0x80, FLAG_DISCONNECTED = 0x00, FLAG_CONNECTING = 0x01, FLAG_WAITTRACKERID = 0x02, FLAG_CONNECTED = 0x03, FLAG_MASK = 0x07 }; #define PROXYPEER_NEEDSCONNECT(flag) ((flag) == FLAG_OUTGOING) #define PROXYPEER_ISCONNECTED(flag) (((flag) & FLAG_MASK) == FLAG_CONNECTED) #define PROXYPEER_SETDISCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_DISCONNECTED) #define PROXYPEER_SETCONNECTING(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTING) #define PROXYPEER_SETWAITTRACKERID(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_WAITTRACKERID) #define PROXYPEER_SETCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTED) typedef struct { int state; /* Whether we want to connect, how far our handshake is, etc. */ ot_ip6 ip; /* The peer to connect to */ uint16_t port; /* The peers port */ uint8_t indata[8192 * 16]; /* Any data not processed yet */ size_t indata_length; /* Length of unprocessed data */ uint32_t tracker_id; /* How the other end greeted */ int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ io_batch outdata; /* The iobatch containing our sync data */ size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ uint8_t packet_type; /* Type of current packet */ uint32_t packet_tid; /* Tracker id for current packet */ } proxy_peer; static void process_indata(proxy_peer *peer); void reset_info_block(proxy_peer *peer) { peer->indata_length = 0; peer->tracker_id = 0; peer->fd = -1; peer->packet_tcount = 0; iob_reset(&peer->outdata); PROXYPEER_SETDISCONNECTED(peer->state); } /* Number of connections to peers * If a peer's IP is set, we try to reconnect, when the connection drops * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it * Multiple connections to/from the same ip are okay, if tracker_id doesn't match * Reconnect attempts occur only twice a minute */ static int g_connection_count; static ot_time g_connection_reconn; static proxy_peer g_connections[MAX_PEERS]; static void handle_reconnects(void) { int i; for (i = 0; i < g_connection_count; ++i) if (PROXYPEER_NEEDSCONNECT(g_connections[i].state)) { int64 newfd = socket_tcp6(); fprintf(stderr, "(Re)connecting to peer..."); if (newfd < 0) continue; /* No socket for you */ io_fd(newfd); if (socket_bind6_reuse(newfd, g_serverip, g_serverport, 0)) { io_close(newfd); continue; } if (socket_connect6(newfd, g_connections[i].ip, g_connections[i].port, 0) == -1 && errno != EINPROGRESS && errno != EWOULDBLOCK) { close(newfd); continue; } io_wantwrite(newfd); /* So we will be informed when it is connected */ io_setcookie(newfd, g_connections + i); /* Prepare connection info block */ reset_info_block(g_connections + i); g_connections[i].fd = newfd; PROXYPEER_SETCONNECTING(g_connections[i].state); } g_connection_reconn = time(NULL) + 30; } /* Handle incoming connection requests, check against whitelist */ static void handle_accept(int64 serversocket) { int64 newfd; ot_ip6 ip; uint16 port; while ((newfd = socket_accept6(serversocket, ip, &port, NULL)) != -1) { /* XXX some access control */ /* Put fd into a non-blocking mode */ io_nonblock(newfd); if (!io_fd(newfd)) io_close(newfd); else { /* Find a new home for our incoming connection */ int i; for (i = 0; i < MAX_PEERS; ++i) if (g_connections[i].state == FLAG_DISCONNECTED) break; if (i == MAX_PEERS) { fprintf(stderr, "No room for incoming connection."); close(newfd); continue; } /* Prepare connection info block */ reset_info_block(g_connections + i); PROXYPEER_SETCONNECTING(g_connections[i].state); g_connections[i].port = port; g_connections[i].fd = newfd; io_setcookie(newfd, g_connections + i); /* We expect the connecting side to begin with its tracker_id */ io_wantread(newfd); } } return; } /* New sync data on the stream */ static void handle_read(int64 peersocket) { int i; int64 datalen; uint32_t tracker_id; proxy_peer *peer = io_getcookie(peersocket); if (!peer) { /* Can't happen ;) */ io_close(peersocket); return; } switch (peer->state & FLAG_MASK) { case FLAG_DISCONNECTED: io_close(peersocket); break; /* Shouldnt happen */ case FLAG_CONNECTING: case FLAG_WAITTRACKERID: /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ if (io_tryread(peersocket, (void *)&tracker_id, sizeof(tracker_id)) != sizeof(tracker_id)) goto close_socket; /* See, if we already have a connection to that peer */ for (i = 0; i < MAX_PEERS; ++i) if ((g_connections[i].state & FLAG_MASK) == FLAG_CONNECTED && g_connections[i].tracker_id == tracker_id) { fprintf(stderr, "Peer already connected. Closing connection.\n"); goto close_socket; } /* Also no need for soliloquy */ if (tracker_id == g_tracker_id) goto close_socket; /* The new connection is good, send our tracker_id on incoming connections */ if (peer->state == FLAG_CONNECTING) if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) != sizeof(g_tracker_id)) goto close_socket; peer->tracker_id = tracker_id; PROXYPEER_SETCONNECTED(peer->state); if (peer->state & FLAG_OUTGOING) fprintf(stderr, "succeeded.\n"); else fprintf(stderr, "Incoming connection successful.\n"); break; close_socket: fprintf(stderr, "Handshake incomplete, closing socket\n"); io_close(peersocket); reset_info_block(peer); break; case FLAG_CONNECTED: /* Here we acutally expect data from peer indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ datalen = io_tryread(peersocket, (void *)(peer->indata + peer->indata_length), sizeof(peer->indata) - peer->indata_length); if (!datalen || datalen < -1) { fprintf(stderr, "Connection closed by remote peer.\n"); io_close(peersocket); reset_info_block(peer); } else if (datalen > 0) { peer->indata_length += datalen; process_indata(peer); } break; } } /* Can write new sync data to the stream */ static void handle_write(int64 peersocket) { proxy_peer *peer = io_getcookie(peersocket); if (!peer) { /* Can't happen ;) */ io_close(peersocket); return; } switch (peer->state & FLAG_MASK) { case FLAG_DISCONNECTED: default: /* Should not happen */ io_close(peersocket); break; case FLAG_CONNECTING: /* Ensure that the connection is established and handle connection error */ if (peer->state & FLAG_OUTGOING && !socket_connected(peersocket)) { fprintf(stderr, "failed\n"); reset_info_block(peer); io_close(peersocket); break; } if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) == sizeof(g_tracker_id)) { PROXYPEER_SETWAITTRACKERID(peer->state); io_dontwantwrite(peersocket); io_wantread(peersocket); } else { fprintf(stderr, "Handshake incomplete, closing socket\n"); io_close(peersocket); reset_info_block(peer); } break; case FLAG_CONNECTED: switch (iob_send(peersocket, &peer->outdata)) { case 0: /* all data sent */ io_dontwantwrite(peersocket); break; case -3: /* an error occured */ io_close(peersocket); reset_info_block(peer); break; default: /* Normal operation or eagain */ break; } break; } return; } static void server_mainloop() { int64 sock; /* inlined livesync_init() */ memset(g_peerbuffer_start, 0, sizeof(g_peerbuffer_start)); g_peerbuffer_pos = g_peerbuffer_start; memcpy(g_peerbuffer_pos, &g_tracker_id, sizeof(g_tracker_id)); uint32_pack_big((char *)g_peerbuffer_pos + sizeof(g_tracker_id), OT_SYNC_PEER); g_peerbuffer_pos += sizeof(g_tracker_id) + sizeof(uint32_t); g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; while (1) { /* See if we need to connect to anyone */ if (time(NULL) > g_connection_reconn) handle_reconnects(); /* Wait for io events until next approx reconn check time */ io_waituntil2(30 * 1000); /* Loop over readable sockets */ while ((sock = io_canread()) != -1) { const void *cookie = io_getcookie(sock); if ((uintptr_t)cookie == FLAG_SERVERSOCKET) handle_accept(sock); else handle_read(sock); } /* Loop over writable sockets */ while ((sock = io_canwrite()) != -1) handle_write(sock); livesync_ticker(); } } static void panic(const char *routine) { fprintf(stderr, "%s: %s\n", routine, strerror(errno)); exit(111); } static int64_t ot_try_bind(ot_ip6 ip, uint16_t port) { int64 sock = socket_tcp6(); if (socket_bind6_reuse(sock, ip, port, 0) == -1) panic("socket_bind6_reuse"); if (socket_listen(sock, SOMAXCONN) == -1) panic("socket_listen"); if (!io_fd(sock)) panic("io_fd"); io_setcookie(sock, (void *)FLAG_SERVERSOCKET); io_wantread(sock); return sock; } static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) { const char *s = src; int off, bracket = 0; while (isspace(*s)) ++s; if (*s == '[') ++s, ++bracket; /* for v6 style notation */ if (!(off = scan_ip6(s, ip))) return 0; s += off; if (*s == 0 || isspace(*s)) return s - src; if (*s == ']' && bracket) ++s; if (!ip6_isv4mapped(ip)) { if ((bracket && *(s) != ':') || (*(s) != '.')) return 0; s++; } else { if (*(s++) != ':') return 0; } if (!(off = scan_ushort(s, port))) return 0; return off + s - src; } int main(int argc, char **argv) { static pthread_t sync_in_thread_id; static pthread_t sync_out_thread_id; ot_ip6 serverip; uint16_t tmpport; int scanon = 1, lbound = 0, sbound = 0; srandom(time(NULL)); #ifdef WANT_ARC4RANDOM g_tracker_id = arc4random(); #else g_tracker_id = random(); #endif while (scanon) { switch (getopt(argc, argv, ":l:c:L:h")) { case -1: scanon = 0; break; case 'l': tmpport = 0; if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) { usage(argv[0]); exit(1); } ot_try_bind(serverip, tmpport); ++sbound; break; case 'c': if (g_connection_count > MAX_PEERS / 2) exerr("Connection limit exceeded.\n"); tmpport = 0; if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) { usage(argv[0]); exit(1); } g_connections[g_connection_count++].state = FLAG_OUTGOING; break; case 'L': tmpport = 9696; if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) { usage(argv[0]); exit(1); } livesync_bind_mcast(serverip, tmpport); ++lbound; break; default: case '?': usage(argv[0]); exit(1); } } if (!lbound) exerr("No livesync port bound."); if (!g_connection_count && !sbound) exerr("No streamsync port bound."); pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL); pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL); server_mainloop(); return 0; } static void *streamsync_worker(void *args) { (void)args; while (1) { int bucket; /* For each bucket... */ for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { /* Get exclusive access to that bucket */ ot_vector *torrents_list = mutex_bucket_lock(bucket); size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; size_t mem, mem_a = 0, mem_b = 0; uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c; if (!torrents_list->size) goto unlock_continue; /* For each torrent in this bucket.. */ for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) { /* Address torrents members */ ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list; switch (peer_list->peer_count) { case 2: count_two++; break; case 1: count_one++; break; case 0: break; default: count_def++; count_peers += peer_list->peer_count; } } /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7; fprintf(stderr, "Mem: %zd\n", mem); ptr = ptr_a = ptr_b = ptr_c = malloc(mem); if (!ptr) goto unlock_continue; if (count_one > 4 || !count_def) { mem_a = 1 + 1 + 2 + count_one * (19 + 7); ptr_b += mem_a; ptr_c += mem_a; ptr_a[0] = 1; /* Offset 0: packet type 1 */ ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ ptr_a[2] = count_one >> 8; ptr_a[3] = count_one & 255; ptr_a += 4; } else count_def += count_one; if (count_two > 4 || !count_def) { mem_b = 1 + 1 + 2 + count_two * (19 + 14); ptr_c += mem_b; ptr_b[0] = 2; /* Offset 0: packet type 2 */ ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ ptr_b[2] = count_two >> 8; ptr_b[3] = count_two & 255; ptr_b += 4; } else count_def += count_two; if (count_def) { ptr_c[0] = 0; /* Offset 0: packet type 0 */ ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ ptr_c[2] = count_def >> 8; ptr_c[3] = count_def & 255; ptr_c += 4; } /* For each torrent in this bucket.. */ for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) { /* Address torrents members */ ot_torrent *torrent = ((ot_torrent *)(torrents_list->data)) + tor_offset; ot_peerlist *peer_list = torrent->peer_list; ot_peer *peers = (ot_peer *)(peer_list->peers.data); uint8_t **dst; /* Determine destination slot */ count_peers = peer_list->peer_count; switch (count_peers) { case 0: continue; case 1: dst = mem_a ? &ptr_a : &ptr_c; break; case 2: dst = mem_b ? &ptr_b : &ptr_c; break; default: dst = &ptr_c; break; } /* Copy tail of info_hash, advance pointer */ memcpy(*dst, ((uint8_t *)torrent->hash) + 1, sizeof(ot_hash) - 1); *dst += sizeof(ot_hash) - 1; /* Encode peer count */ if (dst == &ptr_c) while (count_peers) { if (count_peers <= 0x7f) *(*dst)++ = count_peers; else *(*dst)++ = 0x80 | (count_peers & 0x7f); count_peers >>= 7; } /* Copy peers */ count_peers = peer_list->peer_count; while (count_peers--) { memcpy(*dst, peers++, OT_IP_SIZE + 3); *dst += OT_IP_SIZE + 3; } free_peerlist(peer_list); } free(torrents_list->data); memset(torrents_list, 0, sizeof(*torrents_list)); unlock_continue: mutex_bucket_unlock(bucket, 0); if (ptr) { int i; if (ptr_b > ptr_c) ptr_c = ptr_b; if (ptr_a > ptr_c) ptr_c = ptr_a; mem = ptr_c - ptr; for (i = 0; i < MAX_PEERS; ++i) { if (PROXYPEER_ISCONNECTED(g_connections[i].state)) { void *tmp = malloc(mem); if (tmp) { memcpy(tmp, ptr, mem); iob_addbuf_free(&g_connections[i].outdata, tmp, mem); io_wantwrite(g_connections[i].fd); } } } free(ptr); } usleep(OT_SYNC_SLEEP); } } return 0; } static void livesync_issue_peersync() { socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT); g_peerbuffer_pos = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t); g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; } void livesync_ticker() { /* livesync_issue_peersync sets g_next_packet_time */ if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id)) livesync_issue_peersync(); } static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer) { // unsigned int i; *g_peerbuffer_pos = prefix; memcpy(g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1); memcpy(g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1); #if 0 /* Dump info_hash */ for( i=0; i= g_peerbuffer_highwater) livesync_issue_peersync(); } static void process_indata(proxy_peer *peer) { size_t consumed, peers; uint8_t *data = peer->indata, *hash; uint8_t *dataend = data + peer->indata_length; while (1) { /* If we're not inside of a packet, make a new one */ if (!peer->packet_tcount) { /* Ensure the header is complete or postpone processing */ if (data + 4 > dataend) break; peer->packet_type = data[0]; peer->packet_tprefix = data[1]; peer->packet_tcount = data[2] * 256 + data[3]; data += 4; printf("type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount); } /* Ensure size for a minimal torrent block */ if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend) break; /* Advance pointer to peer count or peers */ hash = data; data += sizeof(ot_hash) - 1; /* Type 0 has peer count encoded before each peers */ peers = peer->packet_type; if (!peers) { int shift = 0; do peers |= (0x7f & *data) << (7 * shift); while (*(data++) & 0x80 && shift++ < 6); } #if 0 printf( "peers: %zd\n", peers ); #endif /* Ensure enough data being read to hold all peers */ if (data + (OT_IP_SIZE + 3) * peers > dataend) { data = hash; break; } while (peers--) { livesync_proxytell(peer->packet_tprefix, hash, data); data += OT_IP_SIZE + 3; } --peer->packet_tcount; } consumed = data - peer->indata; memmove(peer->indata, data, peer->indata_length - consumed); peer->indata_length -= consumed; } static void *livesync_worker(void *args) { (void)args; while (1) { ot_ip6 in_ip; uint16_t in_port; size_t datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port); /* Expect at least tracker id and packet type */ if (datalen <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t))) continue; if (!memcmp(g_inbuffer, &g_tracker_id, sizeof(g_tracker_id))) { /* drop packet coming from ourselves */ continue; } switch (uint32_read_big((char *)g_inbuffer + sizeof(g_tracker_id))) { case OT_SYNC_PEER4: livesync_handle_peersync(datalen, OT_PEER_SIZE4); break; case OT_SYNC_PEER6: livesync_handle_peersync(datalen, OT_PEER_SIZE6); break; default: // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); break; } } return 0; }