summaryrefslogtreecommitdiff
path: root/ot_livesync.c
diff options
context:
space:
mode:
authorerdgeist <>2008-10-06 02:05:53 +0000
committererdgeist <>2008-10-06 02:05:53 +0000
commit465cc2ecdf14909144debe70a4833642e11697da (patch)
tree0c386a01cccd0f8974d5fb74a895eb96b398c414 /ot_livesync.c
parent17724dde29c488f08338653ac6a98fb7a9fd6d22 (diff)
Live sync is now handled in its own thread. Therefore it now creates and handles its own sockets.
Diffstat (limited to 'ot_livesync.c')
-rw-r--r--ot_livesync.c134
1 files changed, 87 insertions, 47 deletions
diff --git a/ot_livesync.c b/ot_livesync.c
index 577bb5f..92c947c 100644
--- a/ot_livesync.c
+++ b/ot_livesync.c
@@ -7,9 +7,11 @@
7#include <sys/types.h> 7#include <sys/types.h>
8#include <sys/uio.h> 8#include <sys/uio.h>
9#include <string.h> 9#include <string.h>
10#include <pthread.h>
10 11
11/* Libowfat */ 12/* Libowfat */
12#include "socket.h" 13#include "socket.h"
14#include "ndelay.h"
13 15
14/* Opentracker */ 16/* Opentracker */
15#include "trackerlogic.h" 17#include "trackerlogic.h"
@@ -17,10 +19,23 @@
17#include "ot_accesslist.h" 19#include "ot_accesslist.h"
18 20
19#ifdef WANT_SYNC_LIVE 21#ifdef WANT_SYNC_LIVE
20char groupip_1[4] = { LIVESYNC_MCASTDOMAIN_1 }; 22
23char groupip_1[4] = { 224,0,23,42 };
24
25#define LIVESYNC_BUFFINSIZE (256*256)
26#define LIVESYNC_BUFFSIZE 1504
27#define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash))
28
29#define LIVESYNC_MAXDELAY 15
30
31/* Forward declaration */
32static void * livesync_worker( void * args );
21 33
22/* For outgoing packets */ 34/* For outgoing packets */
23int64 g_livesync_socket = -1; 35static int64 g_livesync_socket_in = -1;
36
37/* For incoming packets */
38static int64 g_livesync_socket_out = -1;
24 39
25static uint8_t livesync_inbuffer[LIVESYNC_BUFFINSIZE]; 40static uint8_t livesync_inbuffer[LIVESYNC_BUFFINSIZE];
26static uint8_t livesync_outbuffer_start[ LIVESYNC_BUFFSIZE ]; 41static uint8_t livesync_outbuffer_start[ LIVESYNC_BUFFSIZE ];
@@ -28,34 +43,49 @@ static uint8_t *livesync_outbuffer_pos;
28static uint8_t *livesync_outbuffer_highwater = livesync_outbuffer_start + LIVESYNC_BUFFSIZE - LIVESYNC_BUFFWATER; 43static uint8_t *livesync_outbuffer_highwater = livesync_outbuffer_start + LIVESYNC_BUFFSIZE - LIVESYNC_BUFFWATER;
29static ot_time livesync_lastpacket_time; 44static ot_time livesync_lastpacket_time;
30 45
46static pthread_t thread_id;
31void livesync_init( ) { 47void livesync_init( ) {
32 if( g_livesync_socket == -1 ) 48 if( g_livesync_socket_in == -1 )
33 exerr( "No socket address for live sync specified." ); 49 exerr( "No socket address for live sync specified." );
34 livesync_outbuffer_pos = livesync_outbuffer_start; 50 livesync_outbuffer_pos = livesync_outbuffer_start;
35 memmove( livesync_outbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); 51 memmove( livesync_outbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
36 livesync_outbuffer_pos += sizeof( g_tracker_id ); 52 livesync_outbuffer_pos += sizeof( g_tracker_id );
37 livesync_lastpacket_time = g_now; 53 livesync_lastpacket_time = g_now;
54
55 pthread_create( &thread_id, NULL, livesync_worker, NULL );
38} 56}
39 57
40void livesync_deinit() { 58void livesync_deinit() {
41 59 pthread_cancel( thread_id );
42} 60}
43 61
44void livesync_bind_mcast( char *ip, uint16_t port) { 62void livesync_bind_mcast( char *ip, uint16_t port) {
45 char tmpip[4] = {0,0,0,0}; 63 char tmpip[4] = {0,0,0,0};
46 if( g_livesync_socket != -1 ) 64
47 exerr("Livesync listen ip specified twice."); 65 if( g_livesync_socket_in != -1 )
48 if( socket_mcjoin4( ot_try_bind(tmpip, port, FLAG_MCA ), groupip_1, ip ) ) 66 exerr("Error: Livesync listen ip specified twice.");
49 exerr("Cant join mcast group."); 67
50 g_livesync_socket = ot_try_bind( ip, port, FLAG_UDP ); 68 if( ( g_livesync_socket_in = socket_udp4( )) < 0)
51 io_dontwantread(g_livesync_socket); 69 exerr("Error: Cant create live sync incoming socket." );
52 70 ndelay_off(g_livesync_socket_in);
53 socket_mcttl4(g_livesync_socket, 1); 71
54 socket_mcloop4(g_livesync_socket, 0); 72 if( socket_bind4_reuse( g_livesync_socket_in, tmpip, port ) == -1 )
73 exerr("Error: Cant bind live sync incoming socket." );
74
75 if( socket_mcjoin4( g_livesync_socket_in, groupip_1, ip ) )
76 exerr("Error: Cant make live sync incoming socket join mcast group.");
77
78 if( ( g_livesync_socket_out = socket_udp4()) < 0)
79 exerr("Error: Cant create live sync outgoing socket." );
80 if( socket_bind4_reuse( g_livesync_socket_out, ip, port ) == -1 )
81 exerr("Error: Cant bind live sync outgoing socket." );
82
83 socket_mcttl4(g_livesync_socket_out, 1);
84 socket_mcloop4(g_livesync_socket_out, 0);
55} 85}
56 86
57static void livesync_issuepacket( ) { 87static void livesync_issuepacket( ) {
58 socket_send4(g_livesync_socket, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start, 88 socket_send4(g_livesync_socket_out, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start,
59 groupip_1, LIVESYNC_PORT); 89 groupip_1, LIVESYNC_PORT);
60 livesync_outbuffer_pos = livesync_outbuffer_start + sizeof( g_tracker_id ); 90 livesync_outbuffer_pos = livesync_outbuffer_start + sizeof( g_tracker_id );
61 livesync_lastpacket_time = g_now; 91 livesync_lastpacket_time = g_now;
@@ -81,41 +111,51 @@ void livesync_ticker( ) {
81 livesync_issuepacket(); 111 livesync_issuepacket();
82} 112}
83 113
84/* Handle an incoming live sync packet */ 114static void * livesync_worker( void * args ) {
85void handle_livesync( int64 serversocket ) {
86 uint8_t in_ip[4]; uint16_t in_port; 115 uint8_t in_ip[4]; uint16_t in_port;
87 ssize_t datalen = socket_recv4(serversocket, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port); 116 ssize_t datalen;
88 int off = 4; 117 int off;
89
90 if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) {
91 // TODO: log invalid sync packet
92 return;
93 }
94
95 if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) {
96 // TODO: log invalid sync packet
97 return;
98 }
99
100 if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
101 // TODO: log packet coming from ourselves
102 return;
103 }
104
105 // Now basic sanity checks have been done on the live sync packet
106 // We might add more testing and logging.
107 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
108 ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash));
109 ot_hash *hash = (ot_hash*)(livesync_inbuffer + off);
110
111 if( OT_FLAG(peer) & PEER_FLAG_STOPPED )
112 remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA);
113 else
114 add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1));
115
116 off += sizeof( ot_hash ) + sizeof( ot_peer );
117 }
118 118
119 args = args;
120
121 while( 1 ) {
122 datalen = socket_recv4(g_livesync_socket_in, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port);
123 off = 4;
124
125 if( datalen <= 0 )
126 continue;
127
128 if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) {
129 // TODO: log invalid sync packet
130 continue;
131 }
132
133 if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) {
134 // TODO: log invalid sync packet
135 continue;
136 }
137
138 if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
139 // TODO: log packet coming from ourselves
140 continue;
141 }
142
143 // Now basic sanity checks have been done on the live sync packet
144 // We might add more testing and logging.
145 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
146 ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash));
147 ot_hash *hash = (ot_hash*)(livesync_inbuffer + off);
148
149 if( OT_FLAG(peer) & PEER_FLAG_STOPPED )
150 remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA);
151 else
152 add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1));
153
154 off += sizeof( ot_hash ) + sizeof( ot_peer );
155 }
156 }
157 /* Never returns. */
158 return NULL;
119} 159}
120 160
121#endif 161#endif