summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDirk Engling <erdgeist@erdgeist.org>2024-04-13 00:47:29 +0200
committerDirk Engling <erdgeist@erdgeist.org>2024-04-13 00:47:29 +0200
commit1a70d9f9ef81ac1b5e843ac71f3538f7845e03ae (patch)
tree20a20077503c01dc024e88a6a8d82bf89faf22fd
parent301faeb10c5994a6fd31adc5f0b4f8f2b5c23502 (diff)
First shot on chunked transfers
-rw-r--r--opentracker.c39
-rw-r--r--ot_fullscrape.c33
-rw-r--r--ot_http.c93
-rw-r--r--ot_http.h10
-rw-r--r--ot_iovec.c21
-rw-r--r--ot_iovec.h1
-rw-r--r--ot_mutex.c54
-rw-r--r--ot_mutex.h5
-rw-r--r--trackerlogic.c23
-rw-r--r--trackerlogic.h1
10 files changed, 205 insertions, 75 deletions
diff --git a/opentracker.c b/opentracker.c
index 7c67f26..73a3ff3 100644
--- a/opentracker.c
+++ b/opentracker.c
@@ -79,6 +79,7 @@ static void defaul_signal_handlers( void ) {
79 sigaddset (&signal_mask, SIGPIPE); 79 sigaddset (&signal_mask, SIGPIPE);
80 sigaddset (&signal_mask, SIGHUP); 80 sigaddset (&signal_mask, SIGHUP);
81 sigaddset (&signal_mask, SIGINT); 81 sigaddset (&signal_mask, SIGINT);
82 sigaddset (&signal_mask, SIGALRM);
82 pthread_sigmask (SIG_BLOCK, &signal_mask, NULL); 83 pthread_sigmask (SIG_BLOCK, &signal_mask, NULL);
83} 84}
84 85
@@ -90,7 +91,7 @@ static void install_signal_handlers( void ) {
90 sa.sa_handler = signal_handler; 91 sa.sa_handler = signal_handler;
91 sigemptyset(&sa.sa_mask); 92 sigemptyset(&sa.sa_mask);
92 sa.sa_flags = SA_RESTART; 93 sa.sa_flags = SA_RESTART;
93 if ((sigaction(SIGINT, &sa, NULL) == -1)) 94 if ((sigaction(SIGINT, &sa, NULL) == -1) || (sigaction(SIGALRM, &sa, NULL) == -1) )
94 panic( "install_signal_handlers" ); 95 panic( "install_signal_handlers" );
95 96
96 sigaddset (&signal_mask, SIGINT); 97 sigaddset (&signal_mask, SIGINT);
@@ -208,15 +209,23 @@ static void handle_read( const int64 sock, struct ot_workstruct *ws ) {
208static void handle_write( const int64 sock ) { 209static void handle_write( const int64 sock ) {
209 struct http_data* cookie=io_getcookie( sock ); 210 struct http_data* cookie=io_getcookie( sock );
210 size_t i; 211 size_t i;
212 int chunked = 0;
211 213
212 /* Look for the first io_batch still containing bytes to write */ 214 /* Look for the first io_batch still containing bytes to write */
213 if( cookie ) 215 if( cookie ) {
214 for( i = 0; i < cookie->batches; ++i ) 216 if( cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER )
217 chunked = 1;
218
219 for( i = 0; i < cookie->batches; ++i ) {
220 fprintf(stderr, "handle_write inspects batch %d of %d (bytes left: %d)\n", i, cookie->batches, cookie->batch[i].bytesleft);
215 if( cookie->batch[i].bytesleft ) { 221 if( cookie->batch[i].bytesleft ) {
216 int64 res = iob_send( sock, cookie->batch + i ); 222 int64 res = iob_send( sock, cookie->batch + i );
217 223
218 if( res == -3 ) 224 fprintf(stderr, "handle_write yields res %lld when trying to iob_send\n", res);
219 break; 225 if( res == -3 ) {
226 handle_dead( sock );
227 return;
228 }
220 229
221 if( !cookie->batch[i].bytesleft ) 230 if( !cookie->batch[i].bytesleft )
222 continue; 231 continue;
@@ -224,8 +233,17 @@ static void handle_write( const int64 sock ) {
224 if( res == -1 || res > 0 || i < cookie->batches - 1 ) 233 if( res == -1 || res > 0 || i < cookie->batches - 1 )
225 return; 234 return;
226 } 235 }
236 }
237 }
227 238
228 handle_dead( sock ); 239 /* In a chunked transfer after all batches accumulated have been sent, wait for the next one */
240 if( chunked ) {
241//fprintf( stderr, "handle_write is STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER => dont want write on sock %lld\n", sock);
242 //io_dontwantwrite( sock );
243 } else {
244fprintf( stderr, "handle_write is STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER => handle dead on sock %lld\n", sock);
245 handle_dead( sock );
246 }
229} 247}
230 248
231static void handle_accept( const int64 serversocket ) { 249static void handle_accept( const int64 serversocket ) {
@@ -266,7 +284,7 @@ static void * server_mainloop( void * args ) {
266 struct ot_workstruct ws; 284 struct ot_workstruct ws;
267 time_t next_timeout_check = g_now_seconds + OT_CLIENT_TIMEOUT_CHECKINTERVAL; 285 time_t next_timeout_check = g_now_seconds + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
268 struct iovec *iovector; 286 struct iovec *iovector;
269 int iovec_entries; 287 int iovec_entries, is_partial;
270 288
271 (void)args; 289 (void)args;
272 290
@@ -305,8 +323,8 @@ static void * server_mainloop( void * args ) {
305 handle_read( sock, &ws ); 323 handle_read( sock, &ws );
306 } 324 }
307 325
308 while( ( sock = mutex_workqueue_popresult( &iovec_entries, &iovector ) ) != -1 ) 326 while( ( sock = mutex_workqueue_popresult( &iovec_entries, &iovector, &is_partial ) ) != -1 )
309 http_sendiovecdata( sock, &ws, iovec_entries, iovector ); 327 http_sendiovecdata( sock, &ws, iovec_entries, iovector, is_partial );
310 328
311 while( ( sock = io_canwrite( ) ) != -1 ) 329 while( ( sock = io_canwrite( ) ) != -1 )
312 handle_write( sock ); 330 handle_write( sock );
@@ -318,9 +336,6 @@ static void * server_mainloop( void * args ) {
318 } 336 }
319 337
320 livesync_ticker(); 338 livesync_ticker();
321
322 /* Enforce setting the clock */
323 signal_handler( SIGALRM );
324 } 339 }
325 return 0; 340 return 0;
326} 341}
diff --git a/ot_fullscrape.c b/ot_fullscrape.c
index d7d3518..fafd83c 100644
--- a/ot_fullscrape.c
+++ b/ot_fullscrape.c
@@ -36,9 +36,9 @@
36#define OT_SCRAPE_MAXENTRYLEN 256 36#define OT_SCRAPE_MAXENTRYLEN 256
37 37
38/* Forward declaration */ 38/* Forward declaration */
39static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); 39static void fullscrape_make( int taskid, ot_tasktype mode);
40#ifdef WANT_COMPRESSION_GZIP 40#ifdef WANT_COMPRESSION_GZIP
41static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); 41static void fullscrape_make_gzip( int taskid, ot_tasktype mode);
42#endif 42#endif
43 43
44/* Converter function from memory to human readable hex strings 44/* Converter function from memory to human readable hex strings
@@ -49,9 +49,6 @@ static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=
49 It grabs tasks from mutex_tasklist and delivers results back 49 It grabs tasks from mutex_tasklist and delivers results back
50*/ 50*/
51static void * fullscrape_worker( void * args ) { 51static void * fullscrape_worker( void * args ) {
52 int iovec_entries;
53 struct iovec *iovector;
54
55 (void) args; 52 (void) args;
56 53
57 while( g_opentracker_running ) { 54 while( g_opentracker_running ) {
@@ -59,12 +56,11 @@ static void * fullscrape_worker( void * args ) {
59 ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); 56 ot_taskid taskid = mutex_workqueue_poptask( &tasktype );
60#ifdef WANT_COMPRESSION_GZIP 57#ifdef WANT_COMPRESSION_GZIP
61 if (tasktype & TASK_FLAG_GZIP) 58 if (tasktype & TASK_FLAG_GZIP)
62 fullscrape_make_gzip( &iovec_entries, &iovector, tasktype ); 59 fullscrape_make_gzip( taskid, tasktype );
63 else 60 else
64#endif 61#endif
65 fullscrape_make( &iovec_entries, &iovector, tasktype ); 62 fullscrape_make( taskid, tasktype );
66 if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) 63 mutex_workqueue_pushchunked( taskid, NULL );
67 iovec_free( &iovec_entries, &iovector );
68 } 64 }
69 return NULL; 65 return NULL;
70} 66}
@@ -123,14 +119,13 @@ static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torre
123 return r; 119 return r;
124} 120}
125 121
126static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { 122static void fullscrape_make( int taskid, ot_tasktype mode ) {
127 int bucket; 123 int bucket;
128 char *r, *re; 124 char *r, *re;
125 struct iovec iovector = { NULL, 0 };
129 126
130 /* Setup return vector... */ 127 /* Setup return vector... */
131 *iovec_entries = 0; 128 r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE );
132 *iovector = NULL;
133 r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE );
134 if( !r ) 129 if( !r )
135 return; 130 return;
136 131
@@ -152,8 +147,14 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
152 r = fullscrape_write_one( mode, r, torrents+i, &torrents[i].hash ); 147 r = fullscrape_write_one( mode, r, torrents+i, &torrents[i].hash );
153 148
154 if( r > re) { 149 if( r > re) {
150 iovector.iov_len = r - (char *)iovector.iov_base;
151
152 if (mutex_workqueue_pushchunked(taskid, &iovector) ) {
153 free(iovector.iov_base);
154 return mutex_bucket_unlock( bucket, 0 );
155 }
155 /* Allocate a fresh output buffer at the end of our buffers list */ 156 /* Allocate a fresh output buffer at the end of our buffers list */
156 r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SCRAPE_CHUNK_SIZE ); 157 r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE );
157 if( !r ) 158 if( !r )
158 return mutex_bucket_unlock( bucket, 0 ); 159 return mutex_bucket_unlock( bucket, 0 );
159 160
@@ -174,7 +175,9 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
174 r += sprintf( r, "ee" ); 175 r += sprintf( r, "ee" );
175 176
176 /* Release unused memory in current output buffer */ 177 /* Release unused memory in current output buffer */
177 iovec_fixlast( iovec_entries, iovector, r ); 178 iovector.iov_len = r - (char *)iovector.iov_base;
179 if( mutex_workqueue_pushchunked(taskid, &iovector) )
180 free(iovector.iov_base);
178} 181}
179 182
180#ifdef WANT_COMPRESSION_GZIP 183#ifdef WANT_COMPRESSION_GZIP
diff --git a/ot_http.c b/ot_http.c
index 61843a8..edcfadb 100644
--- a/ot_http.c
+++ b/ot_http.c
@@ -121,9 +121,10 @@ ssize_t http_issue_error( const int64 sock, struct ot_workstruct *ws, int code )
121 return ws->reply_size = -2; 121 return ws->reply_size = -2;
122} 122}
123 123
124ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector ) { 124ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector, int is_partial ) {
125 struct http_data *cookie = io_getcookie( sock ); 125 struct http_data *cookie = io_getcookie( sock );
126 char *header; 126 char *header;
127 const char *encoding = "";
127 int i; 128 int i;
128 size_t header_size, size = iovec_length( &iovec_entries, (const struct iovec **)&iovector ); 129 size_t header_size, size = iovec_length( &iovec_entries, (const struct iovec **)&iovector );
129 tai6464 t; 130 tai6464 t;
@@ -140,54 +141,72 @@ ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iove
140 /* If we came here, wait for the answer is over */ 141 /* If we came here, wait for the answer is over */
141 cookie->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK; 142 cookie->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK;
142 143
143 /* Our answers never are 0 vectors. Return an error. */ 144fprintf(stderr, "http_sendiovecdata sending %d iovec entries found cookie->batch == %p\n", iovec_entries, cookie->batch);
144 if( !iovec_entries ) {
145 HTTPERROR_500;
146 }
147 145
148 /* Prepare space for http header */ 146 if( iovec_entries ) {
149 header = malloc( SUCCESS_HTTP_HEADER_LENGTH + SUCCESS_HTTP_HEADER_LENGTH_CONTENT_ENCODING );
150 if( !header ) {
151 iovec_free( &iovec_entries, &iovector );
152 HTTPERROR_500;
153 }
154 147
155 if( cookie->flag & STRUCT_HTTP_FLAG_GZIP ) 148 /* Prepare space for http header */
156 header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: gzip\r\nContent-Length: %zd\r\n\r\n", size ); 149 header = malloc( SUCCESS_HTTP_HEADER_LENGTH + SUCCESS_HTTP_HEADER_LENGTH_CONTENT_ENCODING );
157 else if( cookie->flag & STRUCT_HTTP_FLAG_BZIP2 ) 150 if( !header ) {
158 header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: bzip2\r\nContent-Length: %zd\r\n\r\n", size ); 151 iovec_free( &iovec_entries, &iovector );
159 else 152 HTTPERROR_500;
160 header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n", size ); 153 }
161 154
162 if (!cookie->batch ) { 155 if( cookie->flag & STRUCT_HTTP_FLAG_GZIP )
163 cookie->batch = malloc( sizeof(io_batch) ); 156 encoding = "Content-Encoding: gzip\r\n";
164 memset( cookie->batch, 0, sizeof(io_batch) ); 157 else if( cookie->flag & STRUCT_HTTP_FLAG_BZIP2 )
165 cookie->batches = 1; 158 encoding = "Content-Encoding: bzip2\r\n";
166 } 159
167 iob_addbuf_free( cookie->batch, header, header_size ); 160 if( !(cookie->flag & STRUCT_HTTP_FLAG_CHUNKED) )
161 header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n%sContent-Length: %zd\r\n\r\n", encoding, size );
162 else {
163 if ( !(cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER )) {
164 header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n%sTransfer-Encoding: chunked\r\n\r\n%zx\r\n", encoding, size );
165 cookie->flag |= STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER;
166 } else
167 header_size = sprintf( header, "%zx\r\n", size );
168 }
168 169
169 /* Split huge iovectors into separate io_batches */ 170 if (!cookie->batch ) {
170 for( i=0; i<iovec_entries; ++i ) { 171 cookie->batch = malloc( sizeof(io_batch) );
171 io_batch *current = cookie->batch + cookie->batches - 1; 172 memset( cookie->batch, 0, sizeof(io_batch) );
173 cookie->batches = 1;
174 }
175 iob_addbuf_free( cookie->batch, header, header_size );
172 176
173 /* If the current batch's limit is reached, try to reallocate a new batch to work on */ 177 /* Split huge iovectors into separate io_batches */
174 if( current->bytesleft > OT_BATCH_LIMIT ) { 178 for( i=0; i<iovec_entries; ++i ) {
175 io_batch * new_batch = realloc( current, (cookie->batches + 1) * sizeof(io_batch) ); 179 io_batch *current = cookie->batch + cookie->batches - 1;
180
181 /* If the current batch's limit is reached, try to reallocate a new batch to work on */
182 if( current->bytesleft > OT_BATCH_LIMIT ) {
183fprintf(stderr, "http_sendiovecdata found batch above limit: %zd\n", current->bytesleft);
184 io_batch * new_batch = realloc( cookie->batch, (cookie->batches + 1) * sizeof(io_batch) );
176 if( new_batch ) { 185 if( new_batch ) {
177 cookie->batches++; 186 cookie->batch = new_batch;
178 current = cookie->batch = new_batch; 187 current = cookie->batch + cookie->batches++;
179 memset( current, 0, sizeof(io_batch) ); 188 memset( current, 0, sizeof(io_batch) );
180 } 189 }
190 }
191fprintf(stderr, "http_sendiovecdata calling iob_addbuf_free with %zd\n", iovector[i].iov_len);
192 iob_addbuf_free( current, iovector[i].iov_base, iovector[i].iov_len );
181 } 193 }
194 free( iovector );
195 if ( cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER )
196 iob_addbuf(cookie->batch + cookie->batches - 1, "\r\n", 2);
197 }
182 198
183 iob_addbuf_free( current, iovector[i].iov_base, iovector[i].iov_len ); 199 if ((cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER) && cookie->batch && !is_partial) {
200fprintf(stderr, "http_sendiovecdata adds a terminating 0 size buffer to batch\n");
201 iob_addbuf(cookie->batch + cookie->batches - 1, "0\r\n\r\n", 5);
202 cookie->flag &= ~STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER;
184 } 203 }
185 free( iovector );
186 204
187 /* writeable sockets timeout after 10 minutes */ 205 /* writeable sockets timeout after 10 minutes */
188 taia_now( &t ); taia_addsec( &t, &t, OT_CLIENT_TIMEOUT_SEND ); 206 taia_now( &t ); taia_addsec( &t, &t, OT_CLIENT_TIMEOUT_SEND );
189 io_timeout( sock, t ); 207 io_timeout( sock, t );
190 io_dontwantread( sock ); 208 io_dontwantread( sock );
209fprintf (stderr, "http_sendiovecdata marks socket %lld as wantwrite\n", sock);
191 io_wantwrite( sock ); 210 io_wantwrite( sock );
192 return 0; 211 return 0;
193} 212}
@@ -254,7 +273,7 @@ static const ot_keywords keywords_format[] =
254#endif 273#endif
255#endif 274#endif
256 /* Pass this task to the worker thread */ 275 /* Pass this task to the worker thread */
257 cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; 276 cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK | STRUCT_HTTP_FLAG_CHUNKED;
258 277
259 /* Clients waiting for us should not easily timeout */ 278 /* Clients waiting for us should not easily timeout */
260 taia_uint( &t, 0 ); io_timeout( sock, t ); 279 taia_uint( &t, 0 ); io_timeout( sock, t );
@@ -278,7 +297,7 @@ static const ot_keywords keywords_format[] =
278} 297}
279 298
280#ifdef WANT_MODEST_FULLSCRAPES 299#ifdef WANT_MODEST_FULLSCRAPES
281static pthread_mutex_t g_modest_fullscrape_mutex = PTHREAD_MUTEX_INITIALIZER; 300static pthread_mutex_t g_modest_fullscrape_mutex = PTHREAD_MUTEX_INITIALIZER;
282static ot_vector g_modest_fullscrape_timeouts; 301static ot_vector g_modest_fullscrape_timeouts;
283typedef struct { ot_ip6 ip; ot_time last_fullscrape; } ot_scrape_log; 302typedef struct { ot_ip6 ip; ot_time last_fullscrape; } ot_scrape_log;
284#endif 303#endif
@@ -325,7 +344,7 @@ static ssize_t http_handle_fullscrape( const int64 sock, struct ot_workstruct *w
325#endif 344#endif
326 345
327 /* Pass this task to the worker thread */ 346 /* Pass this task to the worker thread */
328 cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; 347 cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK | STRUCT_HTTP_FLAG_CHUNKED;
329 /* Clients waiting for us should not easily timeout */ 348 /* Clients waiting for us should not easily timeout */
330 taia_uint( &t, 0 ); io_timeout( sock, t ); 349 taia_uint( &t, 0 ); io_timeout( sock, t );
331 fullscrape_deliver( sock, TASK_FULLSCRAPE | format ); 350 fullscrape_deliver( sock, TASK_FULLSCRAPE | format );
diff --git a/ot_http.h b/ot_http.h
index 40161d8..a63e3d3 100644
--- a/ot_http.h
+++ b/ot_http.h
@@ -7,9 +7,11 @@
7#define OT_HTTP_H__ 7#define OT_HTTP_H__
8 8
9typedef enum { 9typedef enum {
10 STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, 10 STRUCT_HTTP_FLAG_WAITINGFORTASK = 1,
11 STRUCT_HTTP_FLAG_GZIP = 2, 11 STRUCT_HTTP_FLAG_GZIP = 2,
12 STRUCT_HTTP_FLAG_BZIP2 = 4 12 STRUCT_HTTP_FLAG_BZIP2 = 4,
13 STRUCT_HTTP_FLAG_CHUNKED = 8,
14 STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 16
13} STRUCT_HTTP_FLAG; 15} STRUCT_HTTP_FLAG;
14 16
15struct http_data { 17struct http_data {
@@ -21,7 +23,7 @@ struct http_data {
21}; 23};
22 24
23ssize_t http_handle_request( const int64 s, struct ot_workstruct *ws ); 25ssize_t http_handle_request( const int64 s, struct ot_workstruct *ws );
24ssize_t http_sendiovecdata( const int64 s, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector ); 26ssize_t http_sendiovecdata( const int64 s, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector, int is_partial );
25ssize_t http_issue_error( const int64 s, struct ot_workstruct *ws, int code ); 27ssize_t http_issue_error( const int64 s, struct ot_workstruct *ws, int code );
26 28
27extern char *g_stats_path; 29extern char *g_stats_path;
diff --git a/ot_iovec.c b/ot_iovec.c
index fec3912..f9567a9 100644
--- a/ot_iovec.c
+++ b/ot_iovec.c
@@ -35,6 +35,26 @@ void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_al
35 return new_data; 35 return new_data;
36} 36}
37 37
38void *iovec_append( int *iovec_entries, struct iovec **iovector, struct iovec *append_iovector) {
39 int new_entries = *iovec_entries + 1;
40 struct iovec *new_vec = realloc( *iovector, new_entries * sizeof( struct iovec ) );
41 if( !new_vec )
42 return NULL;
43
44 /* Take over data from appended iovec */
45 new_vec[*iovec_entries].iov_base = append_iovector->iov_base;
46 new_vec[*iovec_entries].iov_len = append_iovector->iov_len;
47
48 append_iovector->iov_base = NULL;
49 append_iovector->iov_len = 0;
50
51 *iovector = new_vec;
52 *iovec_entries = new_entries;
53
54 return new_vec;
55}
56
57
38void iovec_free( int *iovec_entries, struct iovec **iovector ) { 58void iovec_free( int *iovec_entries, struct iovec **iovector ) {
39 int i; 59 int i;
40 for( i=0; i<*iovec_entries; ++i ) 60 for( i=0; i<*iovec_entries; ++i )
@@ -64,7 +84,6 @@ void *iovec_fix_increase_or_free( int *iovec_entries, struct iovec **iovector, v
64 return new_data; 84 return new_data;
65} 85}
66 86
67
68size_t iovec_length( const int *iovec_entries, const struct iovec **iovector ) { 87size_t iovec_length( const int *iovec_entries, const struct iovec **iovector ) {
69 size_t length = 0; 88 size_t length = 0;
70 int i; 89 int i;
diff --git a/ot_iovec.h b/ot_iovec.h
index e48008e..bb953c3 100644
--- a/ot_iovec.h
+++ b/ot_iovec.h
@@ -9,6 +9,7 @@
9#include <sys/uio.h> 9#include <sys/uio.h>
10 10
11void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_alloc ); 11void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_alloc );
12void *iovec_append( int *iovec_entries, struct iovec **iovector, struct iovec *append_iovector );
12void iovec_fixlast( int *iovec_entries, struct iovec **iovector, void *last_ptr ); 13void iovec_fixlast( int *iovec_entries, struct iovec **iovector, void *last_ptr );
13void iovec_free( int *iovec_entries, struct iovec **iovector ); 14void iovec_free( int *iovec_entries, struct iovec **iovector );
14 15
diff --git a/ot_mutex.c b/ot_mutex.c
index 497b1af..b61b915 100644
--- a/ot_mutex.c
+++ b/ot_mutex.c
@@ -17,6 +17,7 @@
17 17
18/* Opentracker */ 18/* Opentracker */
19#include "trackerlogic.h" 19#include "trackerlogic.h"
20#include "ot_iovec.h"
20#include "ot_mutex.h" 21#include "ot_mutex.h"
21#include "ot_stats.h" 22#include "ot_stats.h"
22 23
@@ -194,23 +195,66 @@ int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iove
194 return task ? 0 : -1; 195 return task ? 0 : -1;
195} 196}
196 197
197int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec ) { 198int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) {
199 struct ot_task * task;
200 const char byte = 'o';
201
202 /* Want exclusive access to tasklist */
203 pthread_mutex_lock( &tasklist_mutex );
204
205 for (task = tasklist; task; task = task->next)
206 if (task->taskid == taskid) {
207 if( iovec ) {
208fprintf(stderr, "mutex_workqueue_pushchunked pushing on taskid %d\n", taskid);
209 if (!iovec_append(&task->iovec_entries, &task->iovec, iovec) )
210 return -1;
211 task->tasktype = TASK_DONE_PARTIAL;
212 } else {
213fprintf(stderr, "mutex_workqueue_pushchunked finished taskid %d\n", taskid);
214 task->tasktype = TASK_DONE;
215 }
216 break;
217 }
218
219 /* Release lock */
220 pthread_mutex_unlock( &tasklist_mutex );
221
222 io_trywrite( g_self_pipe[1], &byte, 1 );
223if(!task)
224fprintf(stderr, "mutex_workqueue_pushchunked taskid %d not found\n", taskid);
225
226 /* Indicate whether the worker has to throw away results */
227 return task ? 0 : -1;
228}
229
230
231int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec, int *is_partial ) {
198 struct ot_task ** task; 232 struct ot_task ** task;
199 int64 sock = -1; 233 int64 sock = -1;
200 234
235 *is_partial = 0;
236
201 /* Want exclusive access to tasklist */ 237 /* Want exclusive access to tasklist */
202 pthread_mutex_lock( &tasklist_mutex ); 238 pthread_mutex_lock( &tasklist_mutex );
203 239
204 for (task = &tasklist; *task; task = &((*task)->next)) 240 for (task = &tasklist; *task; task = &((*task)->next))
205 if ((*task)->tasktype == TASK_DONE) { 241 if (((*task)->tasktype & TASK_CLASS_MASK ) == TASK_DONE) {
206 struct ot_task *ptask = *task; 242 struct ot_task *ptask = *task;
207 243fprintf(stderr, "Got task %d type %d with %d entries\n", (*task)->taskid, (*task)->tasktype, ptask->iovec_entries);
208 *iovec_entries = ptask->iovec_entries; 244 *iovec_entries = ptask->iovec_entries;
209 *iovec = ptask->iovec; 245 *iovec = ptask->iovec;
210 sock = ptask->sock; 246 sock = ptask->sock;
211 247
212 *task = ptask->next; 248 if ((*task)->tasktype == TASK_DONE) {
213 free( ptask ); 249 *task = ptask->next;
250 free( ptask );
251 } else {
252 ptask->iovec_entries = 0;
253 ptask->iovec = NULL;
254 *is_partial = 1;
255 /* Prevent task from showing up immediately again unless new data was added */
256 (*task)->tasktype = TASK_FULLSCRAPE;
257 }
214 break; 258 break;
215 } 259 }
216 260
diff --git a/ot_mutex.h b/ot_mutex.h
index fdb08a1..9eb17e5 100644
--- a/ot_mutex.h
+++ b/ot_mutex.h
@@ -54,9 +54,11 @@ typedef enum {
54 TASK_DMEM = 0x0300, 54 TASK_DMEM = 0x0300,
55 55
56 TASK_DONE = 0x0f00, 56 TASK_DONE = 0x0f00,
57 TASK_DONE_PARTIAL = 0x0f01,
57 58
58 TASK_FLAG_GZIP = 0x1000, 59 TASK_FLAG_GZIP = 0x1000,
59 TASK_FLAG_BZIP2 = 0x2000, 60 TASK_FLAG_BZIP2 = 0x2000,
61 TASK_FLAG_CHUNKED = 0x4000,
60 62
61 TASK_TASK_MASK = 0x0fff, 63 TASK_TASK_MASK = 0x0fff,
62 TASK_CLASS_MASK = 0x0f00, 64 TASK_CLASS_MASK = 0x0f00,
@@ -70,6 +72,7 @@ void mutex_workqueue_canceltask( int64 sock );
70void mutex_workqueue_pushsuccess( ot_taskid taskid ); 72void mutex_workqueue_pushsuccess( ot_taskid taskid );
71ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ); 73ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype );
72int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector ); 74int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector );
73int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector ); 75int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec);
76int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector, int *is_partial );
74 77
75#endif 78#endif
diff --git a/trackerlogic.c b/trackerlogic.c
index 6fd2724..13d2741 100644
--- a/trackerlogic.c
+++ b/trackerlogic.c
@@ -517,6 +517,29 @@ size_t peer_size_from_peer6(ot_peer6 *peer) {
517 return OT_PEER_SIZE4; 517 return OT_PEER_SIZE4;
518} 518}
519 519
520void trackerlogic_add_random_torrents(size_t amount) {
521 struct ot_workstruct ws;
522 memset( &ws, 0, sizeof(ws) );
523
524 ws.inbuf=malloc(G_INBUF_SIZE);
525 ws.outbuf=malloc(G_OUTBUF_SIZE);
526 ws.reply=ws.outbuf;
527 ws.hash=ws.inbuf;
528
529 while( amount-- ) {
530 arc4random_buf(ws.hash, sizeof(ot_hash));
531 arc4random_buf(&ws.peer, sizeof(ws.peer));
532
533 OT_PEERFLAG(ws.peer) &= PEER_FLAG_SEEDING | PEER_FLAG_COMPLETED | PEER_FLAG_STOPPED;
534
535 add_peer_to_torrent_and_return_peers( FLAG_TCP, &ws, 1 );
536 }
537
538 free(ws.inbuf);
539 free(ws.outbuf);
540}
541
542
520void exerr( char * message ) { 543void exerr( char * message ) {
521 fprintf( stderr, "%s\n", message ); 544 fprintf( stderr, "%s\n", message );
522 exit( 111 ); 545 exit( 111 );
diff --git a/trackerlogic.h b/trackerlogic.h
index 9f5886d..bc488c9 100644
--- a/trackerlogic.h
+++ b/trackerlogic.h
@@ -190,6 +190,7 @@ size_t remove_peer_from_torrent( PROTO_FLAG proto, struct ot_workstruct *ws );
190size_t return_tcp_scrape_for_torrent( ot_hash const *hash_list, int amount, char *reply ); 190size_t return_tcp_scrape_for_torrent( ot_hash const *hash_list, int amount, char *reply );
191size_t return_udp_scrape_for_torrent( ot_hash const hash, char *reply ); 191size_t return_udp_scrape_for_torrent( ot_hash const hash, char *reply );
192void add_torrent_from_saved_state( ot_hash const hash, ot_time base, size_t down_count ); 192void add_torrent_from_saved_state( ot_hash const hash, ot_time base, size_t down_count );
193void trackerlogic_add_random_torrents(size_t amount);
193 194
194/* torrent iterator */ 195/* torrent iterator */
195void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data ); 196void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data );