diff options
author | Dirk Engling <erdgeist@erdgeist.org> | 2024-04-13 00:47:29 +0200 |
---|---|---|
committer | Dirk Engling <erdgeist@erdgeist.org> | 2024-04-13 00:47:29 +0200 |
commit | 1a70d9f9ef81ac1b5e843ac71f3538f7845e03ae (patch) | |
tree | 20a20077503c01dc024e88a6a8d82bf89faf22fd | |
parent | 301faeb10c5994a6fd31adc5f0b4f8f2b5c23502 (diff) |
First shot on chunked transfers
-rw-r--r-- | opentracker.c | 39 | ||||
-rw-r--r-- | ot_fullscrape.c | 33 | ||||
-rw-r--r-- | ot_http.c | 93 | ||||
-rw-r--r-- | ot_http.h | 10 | ||||
-rw-r--r-- | ot_iovec.c | 21 | ||||
-rw-r--r-- | ot_iovec.h | 1 | ||||
-rw-r--r-- | ot_mutex.c | 54 | ||||
-rw-r--r-- | ot_mutex.h | 5 | ||||
-rw-r--r-- | trackerlogic.c | 23 | ||||
-rw-r--r-- | trackerlogic.h | 1 |
10 files changed, 205 insertions, 75 deletions
diff --git a/opentracker.c b/opentracker.c index 7c67f26..73a3ff3 100644 --- a/opentracker.c +++ b/opentracker.c | |||
@@ -79,6 +79,7 @@ static void defaul_signal_handlers( void ) { | |||
79 | sigaddset (&signal_mask, SIGPIPE); | 79 | sigaddset (&signal_mask, SIGPIPE); |
80 | sigaddset (&signal_mask, SIGHUP); | 80 | sigaddset (&signal_mask, SIGHUP); |
81 | sigaddset (&signal_mask, SIGINT); | 81 | sigaddset (&signal_mask, SIGINT); |
82 | sigaddset (&signal_mask, SIGALRM); | ||
82 | pthread_sigmask (SIG_BLOCK, &signal_mask, NULL); | 83 | pthread_sigmask (SIG_BLOCK, &signal_mask, NULL); |
83 | } | 84 | } |
84 | 85 | ||
@@ -90,7 +91,7 @@ static void install_signal_handlers( void ) { | |||
90 | sa.sa_handler = signal_handler; | 91 | sa.sa_handler = signal_handler; |
91 | sigemptyset(&sa.sa_mask); | 92 | sigemptyset(&sa.sa_mask); |
92 | sa.sa_flags = SA_RESTART; | 93 | sa.sa_flags = SA_RESTART; |
93 | if ((sigaction(SIGINT, &sa, NULL) == -1)) | 94 | if ((sigaction(SIGINT, &sa, NULL) == -1) || (sigaction(SIGALRM, &sa, NULL) == -1) ) |
94 | panic( "install_signal_handlers" ); | 95 | panic( "install_signal_handlers" ); |
95 | 96 | ||
96 | sigaddset (&signal_mask, SIGINT); | 97 | sigaddset (&signal_mask, SIGINT); |
@@ -208,15 +209,23 @@ static void handle_read( const int64 sock, struct ot_workstruct *ws ) { | |||
208 | static void handle_write( const int64 sock ) { | 209 | static void handle_write( const int64 sock ) { |
209 | struct http_data* cookie=io_getcookie( sock ); | 210 | struct http_data* cookie=io_getcookie( sock ); |
210 | size_t i; | 211 | size_t i; |
212 | int chunked = 0; | ||
211 | 213 | ||
212 | /* Look for the first io_batch still containing bytes to write */ | 214 | /* Look for the first io_batch still containing bytes to write */ |
213 | if( cookie ) | 215 | if( cookie ) { |
214 | for( i = 0; i < cookie->batches; ++i ) | 216 | if( cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER ) |
217 | chunked = 1; | ||
218 | |||
219 | for( i = 0; i < cookie->batches; ++i ) { | ||
220 | fprintf(stderr, "handle_write inspects batch %d of %d (bytes left: %d)\n", i, cookie->batches, cookie->batch[i].bytesleft); | ||
215 | if( cookie->batch[i].bytesleft ) { | 221 | if( cookie->batch[i].bytesleft ) { |
216 | int64 res = iob_send( sock, cookie->batch + i ); | 222 | int64 res = iob_send( sock, cookie->batch + i ); |
217 | 223 | ||
218 | if( res == -3 ) | 224 | fprintf(stderr, "handle_write yields res %lld when trying to iob_send\n", res); |
219 | break; | 225 | if( res == -3 ) { |
226 | handle_dead( sock ); | ||
227 | return; | ||
228 | } | ||
220 | 229 | ||
221 | if( !cookie->batch[i].bytesleft ) | 230 | if( !cookie->batch[i].bytesleft ) |
222 | continue; | 231 | continue; |
@@ -224,8 +233,17 @@ static void handle_write( const int64 sock ) { | |||
224 | if( res == -1 || res > 0 || i < cookie->batches - 1 ) | 233 | if( res == -1 || res > 0 || i < cookie->batches - 1 ) |
225 | return; | 234 | return; |
226 | } | 235 | } |
236 | } | ||
237 | } | ||
227 | 238 | ||
228 | handle_dead( sock ); | 239 | /* In a chunked transfer after all batches accumulated have been sent, wait for the next one */ |
240 | if( chunked ) { | ||
241 | //fprintf( stderr, "handle_write is STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER => dont want write on sock %lld\n", sock); | ||
242 | //io_dontwantwrite( sock ); | ||
243 | } else { | ||
244 | fprintf( stderr, "handle_write is STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER => handle dead on sock %lld\n", sock); | ||
245 | handle_dead( sock ); | ||
246 | } | ||
229 | } | 247 | } |
230 | 248 | ||
231 | static void handle_accept( const int64 serversocket ) { | 249 | static void handle_accept( const int64 serversocket ) { |
@@ -266,7 +284,7 @@ static void * server_mainloop( void * args ) { | |||
266 | struct ot_workstruct ws; | 284 | struct ot_workstruct ws; |
267 | time_t next_timeout_check = g_now_seconds + OT_CLIENT_TIMEOUT_CHECKINTERVAL; | 285 | time_t next_timeout_check = g_now_seconds + OT_CLIENT_TIMEOUT_CHECKINTERVAL; |
268 | struct iovec *iovector; | 286 | struct iovec *iovector; |
269 | int iovec_entries; | 287 | int iovec_entries, is_partial; |
270 | 288 | ||
271 | (void)args; | 289 | (void)args; |
272 | 290 | ||
@@ -305,8 +323,8 @@ static void * server_mainloop( void * args ) { | |||
305 | handle_read( sock, &ws ); | 323 | handle_read( sock, &ws ); |
306 | } | 324 | } |
307 | 325 | ||
308 | while( ( sock = mutex_workqueue_popresult( &iovec_entries, &iovector ) ) != -1 ) | 326 | while( ( sock = mutex_workqueue_popresult( &iovec_entries, &iovector, &is_partial ) ) != -1 ) |
309 | http_sendiovecdata( sock, &ws, iovec_entries, iovector ); | 327 | http_sendiovecdata( sock, &ws, iovec_entries, iovector, is_partial ); |
310 | 328 | ||
311 | while( ( sock = io_canwrite( ) ) != -1 ) | 329 | while( ( sock = io_canwrite( ) ) != -1 ) |
312 | handle_write( sock ); | 330 | handle_write( sock ); |
@@ -318,9 +336,6 @@ static void * server_mainloop( void * args ) { | |||
318 | } | 336 | } |
319 | 337 | ||
320 | livesync_ticker(); | 338 | livesync_ticker(); |
321 | |||
322 | /* Enforce setting the clock */ | ||
323 | signal_handler( SIGALRM ); | ||
324 | } | 339 | } |
325 | return 0; | 340 | return 0; |
326 | } | 341 | } |
diff --git a/ot_fullscrape.c b/ot_fullscrape.c index d7d3518..fafd83c 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c | |||
@@ -36,9 +36,9 @@ | |||
36 | #define OT_SCRAPE_MAXENTRYLEN 256 | 36 | #define OT_SCRAPE_MAXENTRYLEN 256 |
37 | 37 | ||
38 | /* Forward declaration */ | 38 | /* Forward declaration */ |
39 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); | 39 | static void fullscrape_make( int taskid, ot_tasktype mode); |
40 | #ifdef WANT_COMPRESSION_GZIP | 40 | #ifdef WANT_COMPRESSION_GZIP |
41 | static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); | 41 | static void fullscrape_make_gzip( int taskid, ot_tasktype mode); |
42 | #endif | 42 | #endif |
43 | 43 | ||
44 | /* Converter function from memory to human readable hex strings | 44 | /* Converter function from memory to human readable hex strings |
@@ -49,9 +49,6 @@ static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e= | |||
49 | It grabs tasks from mutex_tasklist and delivers results back | 49 | It grabs tasks from mutex_tasklist and delivers results back |
50 | */ | 50 | */ |
51 | static void * fullscrape_worker( void * args ) { | 51 | static void * fullscrape_worker( void * args ) { |
52 | int iovec_entries; | ||
53 | struct iovec *iovector; | ||
54 | |||
55 | (void) args; | 52 | (void) args; |
56 | 53 | ||
57 | while( g_opentracker_running ) { | 54 | while( g_opentracker_running ) { |
@@ -59,12 +56,11 @@ static void * fullscrape_worker( void * args ) { | |||
59 | ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); | 56 | ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); |
60 | #ifdef WANT_COMPRESSION_GZIP | 57 | #ifdef WANT_COMPRESSION_GZIP |
61 | if (tasktype & TASK_FLAG_GZIP) | 58 | if (tasktype & TASK_FLAG_GZIP) |
62 | fullscrape_make_gzip( &iovec_entries, &iovector, tasktype ); | 59 | fullscrape_make_gzip( taskid, tasktype ); |
63 | else | 60 | else |
64 | #endif | 61 | #endif |
65 | fullscrape_make( &iovec_entries, &iovector, tasktype ); | 62 | fullscrape_make( taskid, tasktype ); |
66 | if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) | 63 | mutex_workqueue_pushchunked( taskid, NULL ); |
67 | iovec_free( &iovec_entries, &iovector ); | ||
68 | } | 64 | } |
69 | return NULL; | 65 | return NULL; |
70 | } | 66 | } |
@@ -123,14 +119,13 @@ static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torre | |||
123 | return r; | 119 | return r; |
124 | } | 120 | } |
125 | 121 | ||
126 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { | 122 | static void fullscrape_make( int taskid, ot_tasktype mode ) { |
127 | int bucket; | 123 | int bucket; |
128 | char *r, *re; | 124 | char *r, *re; |
125 | struct iovec iovector = { NULL, 0 }; | ||
129 | 126 | ||
130 | /* Setup return vector... */ | 127 | /* Setup return vector... */ |
131 | *iovec_entries = 0; | 128 | r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE ); |
132 | *iovector = NULL; | ||
133 | r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ); | ||
134 | if( !r ) | 129 | if( !r ) |
135 | return; | 130 | return; |
136 | 131 | ||
@@ -152,8 +147,14 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas | |||
152 | r = fullscrape_write_one( mode, r, torrents+i, &torrents[i].hash ); | 147 | r = fullscrape_write_one( mode, r, torrents+i, &torrents[i].hash ); |
153 | 148 | ||
154 | if( r > re) { | 149 | if( r > re) { |
150 | iovector.iov_len = r - (char *)iovector.iov_base; | ||
151 | |||
152 | if (mutex_workqueue_pushchunked(taskid, &iovector) ) { | ||
153 | free(iovector.iov_base); | ||
154 | return mutex_bucket_unlock( bucket, 0 ); | ||
155 | } | ||
155 | /* Allocate a fresh output buffer at the end of our buffers list */ | 156 | /* Allocate a fresh output buffer at the end of our buffers list */ |
156 | r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SCRAPE_CHUNK_SIZE ); | 157 | r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE ); |
157 | if( !r ) | 158 | if( !r ) |
158 | return mutex_bucket_unlock( bucket, 0 ); | 159 | return mutex_bucket_unlock( bucket, 0 ); |
159 | 160 | ||
@@ -174,7 +175,9 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas | |||
174 | r += sprintf( r, "ee" ); | 175 | r += sprintf( r, "ee" ); |
175 | 176 | ||
176 | /* Release unused memory in current output buffer */ | 177 | /* Release unused memory in current output buffer */ |
177 | iovec_fixlast( iovec_entries, iovector, r ); | 178 | iovector.iov_len = r - (char *)iovector.iov_base; |
179 | if( mutex_workqueue_pushchunked(taskid, &iovector) ) | ||
180 | free(iovector.iov_base); | ||
178 | } | 181 | } |
179 | 182 | ||
180 | #ifdef WANT_COMPRESSION_GZIP | 183 | #ifdef WANT_COMPRESSION_GZIP |
@@ -121,9 +121,10 @@ ssize_t http_issue_error( const int64 sock, struct ot_workstruct *ws, int code ) | |||
121 | return ws->reply_size = -2; | 121 | return ws->reply_size = -2; |
122 | } | 122 | } |
123 | 123 | ||
124 | ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector ) { | 124 | ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector, int is_partial ) { |
125 | struct http_data *cookie = io_getcookie( sock ); | 125 | struct http_data *cookie = io_getcookie( sock ); |
126 | char *header; | 126 | char *header; |
127 | const char *encoding = ""; | ||
127 | int i; | 128 | int i; |
128 | size_t header_size, size = iovec_length( &iovec_entries, (const struct iovec **)&iovector ); | 129 | size_t header_size, size = iovec_length( &iovec_entries, (const struct iovec **)&iovector ); |
129 | tai6464 t; | 130 | tai6464 t; |
@@ -140,54 +141,72 @@ ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iove | |||
140 | /* If we came here, wait for the answer is over */ | 141 | /* If we came here, wait for the answer is over */ |
141 | cookie->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK; | 142 | cookie->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK; |
142 | 143 | ||
143 | /* Our answers never are 0 vectors. Return an error. */ | 144 | fprintf(stderr, "http_sendiovecdata sending %d iovec entries found cookie->batch == %p\n", iovec_entries, cookie->batch); |
144 | if( !iovec_entries ) { | ||
145 | HTTPERROR_500; | ||
146 | } | ||
147 | 145 | ||
148 | /* Prepare space for http header */ | 146 | if( iovec_entries ) { |
149 | header = malloc( SUCCESS_HTTP_HEADER_LENGTH + SUCCESS_HTTP_HEADER_LENGTH_CONTENT_ENCODING ); | ||
150 | if( !header ) { | ||
151 | iovec_free( &iovec_entries, &iovector ); | ||
152 | HTTPERROR_500; | ||
153 | } | ||
154 | 147 | ||
155 | if( cookie->flag & STRUCT_HTTP_FLAG_GZIP ) | 148 | /* Prepare space for http header */ |
156 | header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: gzip\r\nContent-Length: %zd\r\n\r\n", size ); | 149 | header = malloc( SUCCESS_HTTP_HEADER_LENGTH + SUCCESS_HTTP_HEADER_LENGTH_CONTENT_ENCODING ); |
157 | else if( cookie->flag & STRUCT_HTTP_FLAG_BZIP2 ) | 150 | if( !header ) { |
158 | header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: bzip2\r\nContent-Length: %zd\r\n\r\n", size ); | 151 | iovec_free( &iovec_entries, &iovector ); |
159 | else | 152 | HTTPERROR_500; |
160 | header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n", size ); | 153 | } |
161 | 154 | ||
162 | if (!cookie->batch ) { | 155 | if( cookie->flag & STRUCT_HTTP_FLAG_GZIP ) |
163 | cookie->batch = malloc( sizeof(io_batch) ); | 156 | encoding = "Content-Encoding: gzip\r\n"; |
164 | memset( cookie->batch, 0, sizeof(io_batch) ); | 157 | else if( cookie->flag & STRUCT_HTTP_FLAG_BZIP2 ) |
165 | cookie->batches = 1; | 158 | encoding = "Content-Encoding: bzip2\r\n"; |
166 | } | 159 | |
167 | iob_addbuf_free( cookie->batch, header, header_size ); | 160 | if( !(cookie->flag & STRUCT_HTTP_FLAG_CHUNKED) ) |
161 | header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n%sContent-Length: %zd\r\n\r\n", encoding, size ); | ||
162 | else { | ||
163 | if ( !(cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER )) { | ||
164 | header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n%sTransfer-Encoding: chunked\r\n\r\n%zx\r\n", encoding, size ); | ||
165 | cookie->flag |= STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER; | ||
166 | } else | ||
167 | header_size = sprintf( header, "%zx\r\n", size ); | ||
168 | } | ||
168 | 169 | ||
169 | /* Split huge iovectors into separate io_batches */ | 170 | if (!cookie->batch ) { |
170 | for( i=0; i<iovec_entries; ++i ) { | 171 | cookie->batch = malloc( sizeof(io_batch) ); |
171 | io_batch *current = cookie->batch + cookie->batches - 1; | 172 | memset( cookie->batch, 0, sizeof(io_batch) ); |
173 | cookie->batches = 1; | ||
174 | } | ||
175 | iob_addbuf_free( cookie->batch, header, header_size ); | ||
172 | 176 | ||
173 | /* If the current batch's limit is reached, try to reallocate a new batch to work on */ | 177 | /* Split huge iovectors into separate io_batches */ |
174 | if( current->bytesleft > OT_BATCH_LIMIT ) { | 178 | for( i=0; i<iovec_entries; ++i ) { |
175 | io_batch * new_batch = realloc( current, (cookie->batches + 1) * sizeof(io_batch) ); | 179 | io_batch *current = cookie->batch + cookie->batches - 1; |
180 | |||
181 | /* If the current batch's limit is reached, try to reallocate a new batch to work on */ | ||
182 | if( current->bytesleft > OT_BATCH_LIMIT ) { | ||
183 | fprintf(stderr, "http_sendiovecdata found batch above limit: %zd\n", current->bytesleft); | ||
184 | io_batch * new_batch = realloc( cookie->batch, (cookie->batches + 1) * sizeof(io_batch) ); | ||
176 | if( new_batch ) { | 185 | if( new_batch ) { |
177 | cookie->batches++; | 186 | cookie->batch = new_batch; |
178 | current = cookie->batch = new_batch; | 187 | current = cookie->batch + cookie->batches++; |
179 | memset( current, 0, sizeof(io_batch) ); | 188 | memset( current, 0, sizeof(io_batch) ); |
180 | } | 189 | } |
190 | } | ||
191 | fprintf(stderr, "http_sendiovecdata calling iob_addbuf_free with %zd\n", iovector[i].iov_len); | ||
192 | iob_addbuf_free( current, iovector[i].iov_base, iovector[i].iov_len ); | ||
181 | } | 193 | } |
194 | free( iovector ); | ||
195 | if ( cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER ) | ||
196 | iob_addbuf(cookie->batch + cookie->batches - 1, "\r\n", 2); | ||
197 | } | ||
182 | 198 | ||
183 | iob_addbuf_free( current, iovector[i].iov_base, iovector[i].iov_len ); | 199 | if ((cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER) && cookie->batch && !is_partial) { |
200 | fprintf(stderr, "http_sendiovecdata adds a terminating 0 size buffer to batch\n"); | ||
201 | iob_addbuf(cookie->batch + cookie->batches - 1, "0\r\n\r\n", 5); | ||
202 | cookie->flag &= ~STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER; | ||
184 | } | 203 | } |
185 | free( iovector ); | ||
186 | 204 | ||
187 | /* writeable sockets timeout after 10 minutes */ | 205 | /* writeable sockets timeout after 10 minutes */ |
188 | taia_now( &t ); taia_addsec( &t, &t, OT_CLIENT_TIMEOUT_SEND ); | 206 | taia_now( &t ); taia_addsec( &t, &t, OT_CLIENT_TIMEOUT_SEND ); |
189 | io_timeout( sock, t ); | 207 | io_timeout( sock, t ); |
190 | io_dontwantread( sock ); | 208 | io_dontwantread( sock ); |
209 | fprintf (stderr, "http_sendiovecdata marks socket %lld as wantwrite\n", sock); | ||
191 | io_wantwrite( sock ); | 210 | io_wantwrite( sock ); |
192 | return 0; | 211 | return 0; |
193 | } | 212 | } |
@@ -254,7 +273,7 @@ static const ot_keywords keywords_format[] = | |||
254 | #endif | 273 | #endif |
255 | #endif | 274 | #endif |
256 | /* Pass this task to the worker thread */ | 275 | /* Pass this task to the worker thread */ |
257 | cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; | 276 | cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK | STRUCT_HTTP_FLAG_CHUNKED; |
258 | 277 | ||
259 | /* Clients waiting for us should not easily timeout */ | 278 | /* Clients waiting for us should not easily timeout */ |
260 | taia_uint( &t, 0 ); io_timeout( sock, t ); | 279 | taia_uint( &t, 0 ); io_timeout( sock, t ); |
@@ -278,7 +297,7 @@ static const ot_keywords keywords_format[] = | |||
278 | } | 297 | } |
279 | 298 | ||
280 | #ifdef WANT_MODEST_FULLSCRAPES | 299 | #ifdef WANT_MODEST_FULLSCRAPES |
281 | static pthread_mutex_t g_modest_fullscrape_mutex = PTHREAD_MUTEX_INITIALIZER; | 300 | static pthread_mutex_t g_modest_fullscrape_mutex = PTHREAD_MUTEX_INITIALIZER; |
282 | static ot_vector g_modest_fullscrape_timeouts; | 301 | static ot_vector g_modest_fullscrape_timeouts; |
283 | typedef struct { ot_ip6 ip; ot_time last_fullscrape; } ot_scrape_log; | 302 | typedef struct { ot_ip6 ip; ot_time last_fullscrape; } ot_scrape_log; |
284 | #endif | 303 | #endif |
@@ -325,7 +344,7 @@ static ssize_t http_handle_fullscrape( const int64 sock, struct ot_workstruct *w | |||
325 | #endif | 344 | #endif |
326 | 345 | ||
327 | /* Pass this task to the worker thread */ | 346 | /* Pass this task to the worker thread */ |
328 | cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; | 347 | cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK | STRUCT_HTTP_FLAG_CHUNKED; |
329 | /* Clients waiting for us should not easily timeout */ | 348 | /* Clients waiting for us should not easily timeout */ |
330 | taia_uint( &t, 0 ); io_timeout( sock, t ); | 349 | taia_uint( &t, 0 ); io_timeout( sock, t ); |
331 | fullscrape_deliver( sock, TASK_FULLSCRAPE | format ); | 350 | fullscrape_deliver( sock, TASK_FULLSCRAPE | format ); |
@@ -7,9 +7,11 @@ | |||
7 | #define OT_HTTP_H__ | 7 | #define OT_HTTP_H__ |
8 | 8 | ||
9 | typedef enum { | 9 | typedef enum { |
10 | STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, | 10 | STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, |
11 | STRUCT_HTTP_FLAG_GZIP = 2, | 11 | STRUCT_HTTP_FLAG_GZIP = 2, |
12 | STRUCT_HTTP_FLAG_BZIP2 = 4 | 12 | STRUCT_HTTP_FLAG_BZIP2 = 4, |
13 | STRUCT_HTTP_FLAG_CHUNKED = 8, | ||
14 | STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 16 | ||
13 | } STRUCT_HTTP_FLAG; | 15 | } STRUCT_HTTP_FLAG; |
14 | 16 | ||
15 | struct http_data { | 17 | struct http_data { |
@@ -21,7 +23,7 @@ struct http_data { | |||
21 | }; | 23 | }; |
22 | 24 | ||
23 | ssize_t http_handle_request( const int64 s, struct ot_workstruct *ws ); | 25 | ssize_t http_handle_request( const int64 s, struct ot_workstruct *ws ); |
24 | ssize_t http_sendiovecdata( const int64 s, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector ); | 26 | ssize_t http_sendiovecdata( const int64 s, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector, int is_partial ); |
25 | ssize_t http_issue_error( const int64 s, struct ot_workstruct *ws, int code ); | 27 | ssize_t http_issue_error( const int64 s, struct ot_workstruct *ws, int code ); |
26 | 28 | ||
27 | extern char *g_stats_path; | 29 | extern char *g_stats_path; |
@@ -35,6 +35,26 @@ void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_al | |||
35 | return new_data; | 35 | return new_data; |
36 | } | 36 | } |
37 | 37 | ||
38 | void *iovec_append( int *iovec_entries, struct iovec **iovector, struct iovec *append_iovector) { | ||
39 | int new_entries = *iovec_entries + 1; | ||
40 | struct iovec *new_vec = realloc( *iovector, new_entries * sizeof( struct iovec ) ); | ||
41 | if( !new_vec ) | ||
42 | return NULL; | ||
43 | |||
44 | /* Take over data from appended iovec */ | ||
45 | new_vec[*iovec_entries].iov_base = append_iovector->iov_base; | ||
46 | new_vec[*iovec_entries].iov_len = append_iovector->iov_len; | ||
47 | |||
48 | append_iovector->iov_base = NULL; | ||
49 | append_iovector->iov_len = 0; | ||
50 | |||
51 | *iovector = new_vec; | ||
52 | *iovec_entries = new_entries; | ||
53 | |||
54 | return new_vec; | ||
55 | } | ||
56 | |||
57 | |||
38 | void iovec_free( int *iovec_entries, struct iovec **iovector ) { | 58 | void iovec_free( int *iovec_entries, struct iovec **iovector ) { |
39 | int i; | 59 | int i; |
40 | for( i=0; i<*iovec_entries; ++i ) | 60 | for( i=0; i<*iovec_entries; ++i ) |
@@ -64,7 +84,6 @@ void *iovec_fix_increase_or_free( int *iovec_entries, struct iovec **iovector, v | |||
64 | return new_data; | 84 | return new_data; |
65 | } | 85 | } |
66 | 86 | ||
67 | |||
68 | size_t iovec_length( const int *iovec_entries, const struct iovec **iovector ) { | 87 | size_t iovec_length( const int *iovec_entries, const struct iovec **iovector ) { |
69 | size_t length = 0; | 88 | size_t length = 0; |
70 | int i; | 89 | int i; |
@@ -9,6 +9,7 @@ | |||
9 | #include <sys/uio.h> | 9 | #include <sys/uio.h> |
10 | 10 | ||
11 | void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_alloc ); | 11 | void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_alloc ); |
12 | void *iovec_append( int *iovec_entries, struct iovec **iovector, struct iovec *append_iovector ); | ||
12 | void iovec_fixlast( int *iovec_entries, struct iovec **iovector, void *last_ptr ); | 13 | void iovec_fixlast( int *iovec_entries, struct iovec **iovector, void *last_ptr ); |
13 | void iovec_free( int *iovec_entries, struct iovec **iovector ); | 14 | void iovec_free( int *iovec_entries, struct iovec **iovector ); |
14 | 15 | ||
@@ -17,6 +17,7 @@ | |||
17 | 17 | ||
18 | /* Opentracker */ | 18 | /* Opentracker */ |
19 | #include "trackerlogic.h" | 19 | #include "trackerlogic.h" |
20 | #include "ot_iovec.h" | ||
20 | #include "ot_mutex.h" | 21 | #include "ot_mutex.h" |
21 | #include "ot_stats.h" | 22 | #include "ot_stats.h" |
22 | 23 | ||
@@ -194,23 +195,66 @@ int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iove | |||
194 | return task ? 0 : -1; | 195 | return task ? 0 : -1; |
195 | } | 196 | } |
196 | 197 | ||
197 | int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec ) { | 198 | int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) { |
199 | struct ot_task * task; | ||
200 | const char byte = 'o'; | ||
201 | |||
202 | /* Want exclusive access to tasklist */ | ||
203 | pthread_mutex_lock( &tasklist_mutex ); | ||
204 | |||
205 | for (task = tasklist; task; task = task->next) | ||
206 | if (task->taskid == taskid) { | ||
207 | if( iovec ) { | ||
208 | fprintf(stderr, "mutex_workqueue_pushchunked pushing on taskid %d\n", taskid); | ||
209 | if (!iovec_append(&task->iovec_entries, &task->iovec, iovec) ) | ||
210 | return -1; | ||
211 | task->tasktype = TASK_DONE_PARTIAL; | ||
212 | } else { | ||
213 | fprintf(stderr, "mutex_workqueue_pushchunked finished taskid %d\n", taskid); | ||
214 | task->tasktype = TASK_DONE; | ||
215 | } | ||
216 | break; | ||
217 | } | ||
218 | |||
219 | /* Release lock */ | ||
220 | pthread_mutex_unlock( &tasklist_mutex ); | ||
221 | |||
222 | io_trywrite( g_self_pipe[1], &byte, 1 ); | ||
223 | if(!task) | ||
224 | fprintf(stderr, "mutex_workqueue_pushchunked taskid %d not found\n", taskid); | ||
225 | |||
226 | /* Indicate whether the worker has to throw away results */ | ||
227 | return task ? 0 : -1; | ||
228 | } | ||
229 | |||
230 | |||
231 | int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec, int *is_partial ) { | ||
198 | struct ot_task ** task; | 232 | struct ot_task ** task; |
199 | int64 sock = -1; | 233 | int64 sock = -1; |
200 | 234 | ||
235 | *is_partial = 0; | ||
236 | |||
201 | /* Want exclusive access to tasklist */ | 237 | /* Want exclusive access to tasklist */ |
202 | pthread_mutex_lock( &tasklist_mutex ); | 238 | pthread_mutex_lock( &tasklist_mutex ); |
203 | 239 | ||
204 | for (task = &tasklist; *task; task = &((*task)->next)) | 240 | for (task = &tasklist; *task; task = &((*task)->next)) |
205 | if ((*task)->tasktype == TASK_DONE) { | 241 | if (((*task)->tasktype & TASK_CLASS_MASK ) == TASK_DONE) { |
206 | struct ot_task *ptask = *task; | 242 | struct ot_task *ptask = *task; |
207 | 243 | fprintf(stderr, "Got task %d type %d with %d entries\n", (*task)->taskid, (*task)->tasktype, ptask->iovec_entries); | |
208 | *iovec_entries = ptask->iovec_entries; | 244 | *iovec_entries = ptask->iovec_entries; |
209 | *iovec = ptask->iovec; | 245 | *iovec = ptask->iovec; |
210 | sock = ptask->sock; | 246 | sock = ptask->sock; |
211 | 247 | ||
212 | *task = ptask->next; | 248 | if ((*task)->tasktype == TASK_DONE) { |
213 | free( ptask ); | 249 | *task = ptask->next; |
250 | free( ptask ); | ||
251 | } else { | ||
252 | ptask->iovec_entries = 0; | ||
253 | ptask->iovec = NULL; | ||
254 | *is_partial = 1; | ||
255 | /* Prevent task from showing up immediately again unless new data was added */ | ||
256 | (*task)->tasktype = TASK_FULLSCRAPE; | ||
257 | } | ||
214 | break; | 258 | break; |
215 | } | 259 | } |
216 | 260 | ||
@@ -54,9 +54,11 @@ typedef enum { | |||
54 | TASK_DMEM = 0x0300, | 54 | TASK_DMEM = 0x0300, |
55 | 55 | ||
56 | TASK_DONE = 0x0f00, | 56 | TASK_DONE = 0x0f00, |
57 | TASK_DONE_PARTIAL = 0x0f01, | ||
57 | 58 | ||
58 | TASK_FLAG_GZIP = 0x1000, | 59 | TASK_FLAG_GZIP = 0x1000, |
59 | TASK_FLAG_BZIP2 = 0x2000, | 60 | TASK_FLAG_BZIP2 = 0x2000, |
61 | TASK_FLAG_CHUNKED = 0x4000, | ||
60 | 62 | ||
61 | TASK_TASK_MASK = 0x0fff, | 63 | TASK_TASK_MASK = 0x0fff, |
62 | TASK_CLASS_MASK = 0x0f00, | 64 | TASK_CLASS_MASK = 0x0f00, |
@@ -70,6 +72,7 @@ void mutex_workqueue_canceltask( int64 sock ); | |||
70 | void mutex_workqueue_pushsuccess( ot_taskid taskid ); | 72 | void mutex_workqueue_pushsuccess( ot_taskid taskid ); |
71 | ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ); | 73 | ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ); |
72 | int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector ); | 74 | int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector ); |
73 | int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector ); | 75 | int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec); |
76 | int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector, int *is_partial ); | ||
74 | 77 | ||
75 | #endif | 78 | #endif |
diff --git a/trackerlogic.c b/trackerlogic.c index 6fd2724..13d2741 100644 --- a/trackerlogic.c +++ b/trackerlogic.c | |||
@@ -517,6 +517,29 @@ size_t peer_size_from_peer6(ot_peer6 *peer) { | |||
517 | return OT_PEER_SIZE4; | 517 | return OT_PEER_SIZE4; |
518 | } | 518 | } |
519 | 519 | ||
520 | void trackerlogic_add_random_torrents(size_t amount) { | ||
521 | struct ot_workstruct ws; | ||
522 | memset( &ws, 0, sizeof(ws) ); | ||
523 | |||
524 | ws.inbuf=malloc(G_INBUF_SIZE); | ||
525 | ws.outbuf=malloc(G_OUTBUF_SIZE); | ||
526 | ws.reply=ws.outbuf; | ||
527 | ws.hash=ws.inbuf; | ||
528 | |||
529 | while( amount-- ) { | ||
530 | arc4random_buf(ws.hash, sizeof(ot_hash)); | ||
531 | arc4random_buf(&ws.peer, sizeof(ws.peer)); | ||
532 | |||
533 | OT_PEERFLAG(ws.peer) &= PEER_FLAG_SEEDING | PEER_FLAG_COMPLETED | PEER_FLAG_STOPPED; | ||
534 | |||
535 | add_peer_to_torrent_and_return_peers( FLAG_TCP, &ws, 1 ); | ||
536 | } | ||
537 | |||
538 | free(ws.inbuf); | ||
539 | free(ws.outbuf); | ||
540 | } | ||
541 | |||
542 | |||
520 | void exerr( char * message ) { | 543 | void exerr( char * message ) { |
521 | fprintf( stderr, "%s\n", message ); | 544 | fprintf( stderr, "%s\n", message ); |
522 | exit( 111 ); | 545 | exit( 111 ); |
diff --git a/trackerlogic.h b/trackerlogic.h index 9f5886d..bc488c9 100644 --- a/trackerlogic.h +++ b/trackerlogic.h | |||
@@ -190,6 +190,7 @@ size_t remove_peer_from_torrent( PROTO_FLAG proto, struct ot_workstruct *ws ); | |||
190 | size_t return_tcp_scrape_for_torrent( ot_hash const *hash_list, int amount, char *reply ); | 190 | size_t return_tcp_scrape_for_torrent( ot_hash const *hash_list, int amount, char *reply ); |
191 | size_t return_udp_scrape_for_torrent( ot_hash const hash, char *reply ); | 191 | size_t return_udp_scrape_for_torrent( ot_hash const hash, char *reply ); |
192 | void add_torrent_from_saved_state( ot_hash const hash, ot_time base, size_t down_count ); | 192 | void add_torrent_from_saved_state( ot_hash const hash, ot_time base, size_t down_count ); |
193 | void trackerlogic_add_random_torrents(size_t amount); | ||
193 | 194 | ||
194 | /* torrent iterator */ | 195 | /* torrent iterator */ |
195 | void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data ); | 196 | void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data ); |