summaryrefslogtreecommitdiff
path: root/ot_livesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'ot_livesync.c')
-rw-r--r--ot_livesync.c205
1 files changed, 114 insertions, 91 deletions
diff --git a/ot_livesync.c b/ot_livesync.c
index cded0f7..269b8d8 100644
--- a/ot_livesync.c
+++ b/ot_livesync.c
@@ -4,204 +4,228 @@
4 $id$ */ 4 $id$ */
5 5
6/* System */ 6/* System */
7#include <pthread.h>
8#include <stdlib.h>
9#include <string.h>
7#include <sys/types.h> 10#include <sys/types.h>
8#include <sys/uio.h> 11#include <sys/uio.h>
9#include <string.h>
10#include <pthread.h>
11#include <unistd.h> 12#include <unistd.h>
12#include <stdlib.h>
13 13
14/* Libowfat */ 14/* Libowfat */
15#include "socket.h"
16#include "ndelay.h"
17#include "byte.h" 15#include "byte.h"
18#include "ip6.h" 16#include "ip6.h"
17#include "ndelay.h"
18#include "socket.h"
19 19
20/* Opentracker */ 20/* Opentracker */
21#include "trackerlogic.h"
22#include "ot_livesync.h"
23#include "ot_accesslist.h" 21#include "ot_accesslist.h"
24#include "ot_stats.h" 22#include "ot_livesync.h"
25#include "ot_mutex.h" 23#include "ot_mutex.h"
24#include "ot_stats.h"
25#include "trackerlogic.h"
26 26
27#ifdef WANT_SYNC_LIVE 27#ifdef WANT_SYNC_LIVE
28 28
29char groupip_1[4] = { 224,0,23,5 }; 29char groupip_1[4] = {224, 0, 23, 5};
30 30
31#define LIVESYNC_INCOMING_BUFFSIZE (256*256) 31#define LIVESYNC_INCOMING_BUFFSIZE (256 * 256)
32 32
33#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 33#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
34#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) 34#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash))
35 35
36#define LIVESYNC_MAXDELAY 15 /* seconds */ 36#define LIVESYNC_MAXDELAY 15 /* seconds */
37 37
38enum { OT_SYNC_PEER }; 38enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
39 39
40/* Forward declaration */ 40/* Forward declaration */
41static void * livesync_worker( void * args ); 41static void *livesync_worker(void *args);
42 42
43/* For outgoing packets */ 43/* For outgoing packets */
44static int64 g_socket_in = -1; 44static int64 g_socket_in = -1;
45 45
46/* For incoming packets */ 46/* For incoming packets */
47static int64 g_socket_out = -1; 47static int64 g_socket_out = -1;
48 48
49static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; 49static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER;
50char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; 50typedef struct {
51static size_t g_outbuf_data; 51 uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
52static ot_time g_next_packet_time; 52 size_t fill;
53 ot_time next_packet_time;
54} sync_buffer;
53 55
54static pthread_t thread_id; 56static sync_buffer g_v6_buf;
55void livesync_init( ) { 57static sync_buffer g_v4_buf;
56 58
57 if( g_socket_in == -1 ) 59static pthread_t thread_id;
58 exerr( "No socket address for live sync specified." ); 60void livesync_init() {
61
62 if (g_socket_in == -1)
63 exerr("No socket address for live sync specified.");
59 64
60 /* Prepare outgoing peers buffer */ 65 /* Prepare outgoing peers buffer */
61 memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) ); 66 memcpy(g_v6_buf.data, &g_tracker_id, sizeof(g_tracker_id));
62 uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER); 67 memcpy(g_v4_buf.data, &g_tracker_id, sizeof(g_tracker_id));
63 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); 68
69 uint32_pack_big((char *)g_v6_buf.data + sizeof(g_tracker_id), OT_SYNC_PEER6);
70 uint32_pack_big((char *)g_v4_buf.data + sizeof(g_tracker_id), OT_SYNC_PEER4);
71
72 g_v6_buf.fill = sizeof(g_tracker_id) + sizeof(uint32_t);
73 g_v4_buf.fill = sizeof(g_tracker_id) + sizeof(uint32_t);
64 74
65 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 75 g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
76 g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
66 77
67 pthread_create( &thread_id, NULL, livesync_worker, NULL ); 78 pthread_create(&thread_id, NULL, livesync_worker, NULL);
68} 79}
69 80
70void livesync_deinit() { 81void livesync_deinit() {
71 if( g_socket_in != -1 ) 82 if (g_socket_in != -1)
72 close( g_socket_in ); 83 close(g_socket_in);
73 if( g_socket_out != -1 ) 84 if (g_socket_out != -1)
74 close( g_socket_out ); 85 close(g_socket_out);
75 86
76 pthread_cancel( thread_id ); 87 pthread_cancel(thread_id);
77} 88}
78 89
79void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { 90void livesync_bind_mcast(ot_ip6 ip, uint16_t port) {
80 char tmpip[4] = {0,0,0,0}; 91 char tmpip[4] = {0, 0, 0, 0};
81 char *v4ip; 92 char *v4ip;
82 93
83 if( !ip6_isv4mapped(ip)) 94 if (!ip6_isv4mapped(ip))
84 exerr("v6 mcast support not yet available."); 95 exerr("v6 mcast support not yet available.");
85 v4ip = ip+12; 96 v4ip = ip + 12;
86 97
87 if( g_socket_in != -1 ) 98 if (g_socket_in != -1)
88 exerr("Error: Livesync listen ip specified twice."); 99 exerr("Error: Livesync listen ip specified twice.");
89 100
90 if( ( g_socket_in = socket_udp4( )) < 0) 101 if ((g_socket_in = socket_udp4()) < 0)
91 exerr("Error: Cant create live sync incoming socket." ); 102 exerr("Error: Cant create live sync incoming socket.");
92 ndelay_off(g_socket_in); 103 ndelay_off(g_socket_in);
93 104
94 if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) 105 if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1)
95 exerr("Error: Cant bind live sync incoming socket." ); 106 exerr("Error: Cant bind live sync incoming socket.");
96 107
97 if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) ) 108 if (socket_mcjoin4(g_socket_in, groupip_1, v4ip))
98 exerr("Error: Cant make live sync incoming socket join mcast group."); 109 exerr("Error: Cant make live sync incoming socket join mcast group.");
99 110
100 if( ( g_socket_out = socket_udp4()) < 0) 111 if ((g_socket_out = socket_udp4()) < 0)
101 exerr("Error: Cant create live sync outgoing socket." ); 112 exerr("Error: Cant create live sync outgoing socket.");
102 if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 ) 113 if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1)
103 exerr("Error: Cant bind live sync outgoing socket." ); 114 exerr("Error: Cant bind live sync outgoing socket.");
104 115
105 socket_mcttl4(g_socket_out, 1); 116 socket_mcttl4(g_socket_out, 1);
106 socket_mcloop4(g_socket_out, 0); 117 socket_mcloop4(g_socket_out, 0);
107} 118}
108 119
109/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ 120/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */
110static void livesync_issue_peersync( ) { 121static void livesync_issue_peersync(sync_buffer *buf) {
111 char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; 122 char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
112 size_t data = g_outbuf_data; 123 size_t fill = buf->fill;
113 124
114 memcpy( mycopy, g_outbuf, data ); 125 memcpy(mycopy, buf->data, fill);
115 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); 126 buf->fill = sizeof(g_tracker_id) + sizeof(uint32_t);
116 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 127 buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
117 128
118 /* From now this thread has a local copy of the buffer and 129 /* From now this thread has a local copy of the buffer and
119 has modified the protected element */ 130 has modified the protected element */
120 pthread_mutex_unlock(&g_outbuf_mutex); 131 pthread_mutex_unlock(&g_outbuf_mutex);
121 132
122 socket_send4(g_socket_out, mycopy, data, groupip_1, LIVESYNC_PORT); 133 socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT);
123} 134}
124 135
125static void livesync_handle_peersync( struct ot_workstruct *ws ) { 136static void livesync_handle_peersync(struct ot_workstruct *ws, size_t peer_size) {
126 int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); 137 size_t off = sizeof(g_tracker_id) + sizeof(uint32_t);
127 138
128 /* Now basic sanity checks have been done on the live sync packet 139 /* Now basic sanity checks have been done on the live sync packet
129 We might add more testing and logging. */ 140 We might add more testing and logging. */
130 while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= ws->request_size ) { 141 while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= ws->request_size) {
131 memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), sizeof( ot_peer ) ); 142 memcpy(&ws->peer, ws->request + off + sizeof(ot_hash), peer_size);
132 ws->hash = (ot_hash*)(ws->request + off); 143 ws->hash = (ot_hash *)(ws->request + off);
133 144
134 if( !g_opentracker_running ) return; 145 if (!g_opentracker_running)
146 return;
135 147
136 if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_STOPPED ) 148 if (OT_PEERFLAG(ws->peer) & PEER_FLAG_STOPPED)
137 remove_peer_from_torrent( FLAG_MCA, ws ); 149 remove_peer_from_torrent(FLAG_MCA, ws);
138 else 150 else
139 add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 ); 151 add_peer_to_torrent_and_return_peers(FLAG_MCA, ws, /* amount = */ 0);
140 152
141 off += sizeof( ot_hash ) + sizeof( ot_peer ); 153 off += sizeof(ot_hash) + peer_size;
142 } 154 }
143 155
144 stats_issue_event(EVENT_SYNC, 0, 156 stats_issue_event(EVENT_SYNC, 0, (ws->request_size - sizeof(g_tracker_id) - sizeof(uint32_t)) / ((ssize_t)sizeof(ot_hash) + peer_size));
145 (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) /
146 ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer )));
147} 157}
148 158
149/* Tickle the live sync module from time to time, so no events get 159/* Tickle the live sync module from time to time, so no events get
150 stuck when there's not enough traffic to fill udp packets fast 160 stuck when there's not enough traffic to fill udp packets fast
151 enough */ 161 enough */
152void livesync_ticker( ) { 162void livesync_ticker() {
153 /* livesync_issue_peersync sets g_next_packet_time */ 163 /* livesync_issue_peersync sets g_next_packet_time */
154 pthread_mutex_lock(&g_outbuf_mutex); 164 pthread_mutex_lock(&g_outbuf_mutex);
155 if( g_now_seconds > g_next_packet_time && 165 if (g_now_seconds > g_v6_buf.next_packet_time && g_v6_buf.fill > sizeof(g_tracker_id) + sizeof(uint32_t))
156 g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) 166 livesync_issue_peersync(&g_v6_buf);
157 livesync_issue_peersync(); 167 else
168 pthread_mutex_unlock(&g_outbuf_mutex);
169
170 pthread_mutex_lock(&g_outbuf_mutex);
171 if (g_now_seconds > g_v4_buf.next_packet_time && g_v4_buf.fill > sizeof(g_tracker_id) + sizeof(uint32_t))
172 livesync_issue_peersync(&g_v4_buf);
158 else 173 else
159 pthread_mutex_unlock(&g_outbuf_mutex); 174 pthread_mutex_unlock(&g_outbuf_mutex);
160} 175}
161 176
162/* Inform live sync about whats going on. */ 177/* Inform live sync about whats going on. */
163void livesync_tell( struct ot_workstruct *ws ) { 178void livesync_tell(struct ot_workstruct *ws) {
179 size_t peer_size; /* initialized in next line */
180 ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size);
181 sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf;
182
164 pthread_mutex_lock(&g_outbuf_mutex); 183 pthread_mutex_lock(&g_outbuf_mutex);
165 184
166 memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); 185 memcpy(dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash));
167 memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) ); 186 dest_buf->fill += sizeof(ot_hash);
168 187
169 g_outbuf_data += sizeof(ot_hash) + sizeof(ot_peer); 188 memcpy(dest_buf->data + dest_buf->fill, peer_src, peer_size);
189 dest_buf->fill += peer_size;
170 190
171 if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) 191 if (dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS)
172 livesync_issue_peersync(); 192 livesync_issue_peersync(dest_buf);
173 else 193 else
174 pthread_mutex_unlock(&g_outbuf_mutex); 194 pthread_mutex_unlock(&g_outbuf_mutex);
175} 195}
176 196
177static void * livesync_worker( void * args ) { 197static void *livesync_worker(void *args) {
178 struct ot_workstruct ws; 198 struct ot_workstruct ws;
179 ot_ip6 in_ip; uint16_t in_port; 199 ot_ip6 in_ip;
200 uint16_t in_port;
180 201
181 (void)args; 202 (void)args;
182 203
183 /* Initialize our "thread local storage" */ 204 /* Initialize our "thread local storage" */
184 ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE ); 205 ws.inbuf = ws.request = malloc(LIVESYNC_INCOMING_BUFFSIZE);
185 ws.outbuf = ws.reply = 0; 206 ws.outbuf = ws.reply = 0;
186 207
187 memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); 208 memcpy(in_ip, V4mappedprefix, sizeof(V4mappedprefix));
188 209
189 while( 1 ) { 210 while (1) {
190 ws.request_size = socket_recv4(g_socket_in, (char*)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); 211 ws.request_size = socket_recv4(g_socket_in, (char *)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port);
191 212
192 /* Expect at least tracker id and packet type */ 213 /* Expect at least tracker id and packet type */
193 if( ws.request_size <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) 214 if (ws.request_size <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t)))
194 continue; 215 continue;
195 if( !accesslist_isblessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) 216 if (!accesslist_is_blessed(in_ip, OT_PERMISSION_MAY_LIVESYNC))
196 continue; 217 continue;
197 if( !memcmp( ws.inbuf, &g_tracker_id, sizeof( g_tracker_id ) ) ) { 218 if (!memcmp(ws.inbuf, &g_tracker_id, sizeof(g_tracker_id))) {
198 /* TODO: log packet coming from ourselves */ 219 /* TODO: log packet coming from ourselves */
199 continue; 220 continue;
200 } 221 }
201 222
202 switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) { 223 switch (uint32_read_big(sizeof(g_tracker_id) + (char *)ws.inbuf)) {
203 case OT_SYNC_PEER: 224 case OT_SYNC_PEER6:
204 livesync_handle_peersync( &ws ); 225 livesync_handle_peersync(&ws, OT_PEER_SIZE6);
226 break;
227 case OT_SYNC_PEER4:
228 livesync_handle_peersync(&ws, OT_PEER_SIZE4);
205 break; 229 break;
206 default: 230 default:
207 break; 231 break;
@@ -213,4 +237,3 @@ static void * livesync_worker( void * args ) {
213} 237}
214 238
215#endif 239#endif
216const char *g_version_livesync_c = "$Source$: $Revision$\n";