summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ot_livesync.c27
1 files changed, 22 insertions, 5 deletions
diff --git a/ot_livesync.c b/ot_livesync.c
index 87fe5cf..cded0f7 100644
--- a/ot_livesync.c
+++ b/ot_livesync.c
@@ -46,13 +46,14 @@ static int64 g_socket_in = -1;
46/* For incoming packets */ 46/* For incoming packets */
47static int64 g_socket_out = -1; 47static int64 g_socket_out = -1;
48 48
49static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER;
49char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; 50char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
50static size_t g_outbuf_data; 51static size_t g_outbuf_data;
51static ot_time g_next_packet_time; 52static ot_time g_next_packet_time;
52 53
53static pthread_t thread_id; 54static pthread_t thread_id;
54void livesync_init( ) { 55void livesync_init( ) {
55 56
56 if( g_socket_in == -1 ) 57 if( g_socket_in == -1 )
57 exerr( "No socket address for live sync specified." ); 58 exerr( "No socket address for live sync specified." );
58 59
@@ -62,7 +63,7 @@ void livesync_init( ) {
62 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); 63 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t );
63 64
64 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 65 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
65 66
66 pthread_create( &thread_id, NULL, livesync_worker, NULL ); 67 pthread_create( &thread_id, NULL, livesync_worker, NULL );
67} 68}
68 69
@@ -105,10 +106,20 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
105 socket_mcloop4(g_socket_out, 0); 106 socket_mcloop4(g_socket_out, 0);
106} 107}
107 108
109/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */
108static void livesync_issue_peersync( ) { 110static void livesync_issue_peersync( ) {
109 socket_send4(g_socket_out, g_outbuf, g_outbuf_data, groupip_1, LIVESYNC_PORT); 111 char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
112 size_t data = g_outbuf_data;
113
114 memcpy( mycopy, g_outbuf, data );
110 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); 115 g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t );
111 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; 116 g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
117
118 /* From now this thread has a local copy of the buffer and
119 has modified the protected element */
120 pthread_mutex_unlock(&g_outbuf_mutex);
121
122 socket_send4(g_socket_out, mycopy, data, groupip_1, LIVESYNC_PORT);
112} 123}
113 124
114static void livesync_handle_peersync( struct ot_workstruct *ws ) { 125static void livesync_handle_peersync( struct ot_workstruct *ws ) {
@@ -140,13 +151,17 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) {
140 enough */ 151 enough */
141void livesync_ticker( ) { 152void livesync_ticker( ) {
142 /* livesync_issue_peersync sets g_next_packet_time */ 153 /* livesync_issue_peersync sets g_next_packet_time */
154 pthread_mutex_lock(&g_outbuf_mutex);
143 if( g_now_seconds > g_next_packet_time && 155 if( g_now_seconds > g_next_packet_time &&
144 g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) 156 g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
145 livesync_issue_peersync(); 157 livesync_issue_peersync();
158 else
159 pthread_mutex_unlock(&g_outbuf_mutex);
146} 160}
147 161
148/* Inform live sync about whats going on. */ 162/* Inform live sync about whats going on. */
149void livesync_tell( struct ot_workstruct *ws ) { 163void livesync_tell( struct ot_workstruct *ws ) {
164 pthread_mutex_lock(&g_outbuf_mutex);
150 165
151 memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); 166 memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) );
152 memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) ); 167 memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) );
@@ -155,6 +170,8 @@ void livesync_tell( struct ot_workstruct *ws ) {
155 170
156 if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) 171 if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS )
157 livesync_issue_peersync(); 172 livesync_issue_peersync();
173 else
174 pthread_mutex_unlock(&g_outbuf_mutex);
158} 175}
159 176
160static void * livesync_worker( void * args ) { 177static void * livesync_worker( void * args ) {
@@ -162,11 +179,11 @@ static void * livesync_worker( void * args ) {
162 ot_ip6 in_ip; uint16_t in_port; 179 ot_ip6 in_ip; uint16_t in_port;
163 180
164 (void)args; 181 (void)args;
165 182
166 /* Initialize our "thread local storage" */ 183 /* Initialize our "thread local storage" */
167 ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE ); 184 ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE );
168 ws.outbuf = ws.reply = 0; 185 ws.outbuf = ws.reply = 0;
169 186
170 memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); 187 memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) );
171 188
172 while( 1 ) { 189 while( 1 ) {