diff options
Diffstat (limited to 'ot_fullscrape.c')
-rw-r--r-- | ot_fullscrape.c | 150 |
1 files changed, 146 insertions, 4 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c index aed2ad9..6fd6d1c 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c | |||
@@ -14,6 +14,10 @@ | |||
14 | #ifdef WANT_COMPRESSION_GZIP | 14 | #ifdef WANT_COMPRESSION_GZIP |
15 | #include <zlib.h> | 15 | #include <zlib.h> |
16 | #endif | 16 | #endif |
17 | #ifdef WANT_COMPRESSION_ZSTD | ||
18 | #include <zstd.h> | ||
19 | #endif | ||
20 | |||
17 | 21 | ||
18 | /* Libowfat */ | 22 | /* Libowfat */ |
19 | #include "byte.h" | 23 | #include "byte.h" |
@@ -40,6 +44,9 @@ static void fullscrape_make(int taskid, ot_tasktype mode); | |||
40 | #ifdef WANT_COMPRESSION_GZIP | 44 | #ifdef WANT_COMPRESSION_GZIP |
41 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode); | 45 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode); |
42 | #endif | 46 | #endif |
47 | #ifdef WANT_COMPRESSION_ZSTD | ||
48 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode); | ||
49 | #endif | ||
43 | 50 | ||
44 | /* Converter function from memory to human readable hex strings | 51 | /* Converter function from memory to human readable hex strings |
45 | XXX - Duplicated from ot_stats. Needs fix. */ | 52 | XXX - Duplicated from ot_stats. Needs fix. */ |
@@ -64,6 +71,11 @@ static void *fullscrape_worker(void *args) { | |||
64 | while (g_opentracker_running) { | 71 | while (g_opentracker_running) { |
65 | ot_tasktype tasktype = TASK_FULLSCRAPE; | 72 | ot_tasktype tasktype = TASK_FULLSCRAPE; |
66 | ot_taskid taskid = mutex_workqueue_poptask(&tasktype); | 73 | ot_taskid taskid = mutex_workqueue_poptask(&tasktype); |
74 | #ifdef WANT_COMPRESSION_ZSTD | ||
75 | if (tasktype & TASK_FLAG_ZSTD) | ||
76 | fullscrape_make_zstd(taskid, tasktype); | ||
77 | else | ||
78 | #endif | ||
67 | #ifdef WANT_COMPRESSION_GZIP | 79 | #ifdef WANT_COMPRESSION_GZIP |
68 | if (tasktype & TASK_FLAG_GZIP) | 80 | if (tasktype & TASK_FLAG_GZIP) |
69 | fullscrape_make_gzip(taskid, tasktype); | 81 | fullscrape_make_gzip(taskid, tasktype); |
@@ -205,7 +217,6 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
205 | struct iovec iovector = {NULL, 0}; | 217 | struct iovec iovector = {NULL, 0}; |
206 | int zres; | 218 | int zres; |
207 | z_stream strm; | 219 | z_stream strm; |
208 | fprintf(stderr, "GZIP path\n"); | ||
209 | /* Setup return vector... */ | 220 | /* Setup return vector... */ |
210 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | 221 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); |
211 | if (!iovector.iov_base) | 222 | if (!iovector.iov_base) |
@@ -267,8 +278,10 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
267 | mutex_bucket_unlock(bucket, 0); | 278 | mutex_bucket_unlock(bucket, 0); |
268 | 279 | ||
269 | /* Parent thread died? */ | 280 | /* Parent thread died? */ |
270 | if (!g_opentracker_running) | 281 | if (!g_opentracker_running) { |
282 | deflateEnd(&strm); | ||
271 | return; | 283 | return; |
284 | } | ||
272 | } | 285 | } |
273 | 286 | ||
274 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | 287 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { |
@@ -282,7 +295,8 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
282 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; | 295 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; |
283 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | 296 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { |
284 | free(iovector.iov_base); | 297 | free(iovector.iov_base); |
285 | return mutex_bucket_unlock(bucket, 0); | 298 | deflateEnd(&strm); |
299 | return; | ||
286 | } | 300 | } |
287 | 301 | ||
288 | /* Check if there's a last batch of data in the zlib buffer */ | 302 | /* Check if there's a last batch of data in the zlib buffer */ |
@@ -293,7 +307,7 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
293 | if (!iovector.iov_base) { | 307 | if (!iovector.iov_base) { |
294 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | 308 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); |
295 | deflateEnd(&strm); | 309 | deflateEnd(&strm); |
296 | return mutex_bucket_unlock(bucket, 0); | 310 | return; |
297 | } | 311 | } |
298 | strm.next_out = iovector.iov_base; | 312 | strm.next_out = iovector.iov_base; |
299 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | 313 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; |
@@ -311,5 +325,133 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
311 | /* WANT_COMPRESSION_GZIP */ | 325 | /* WANT_COMPRESSION_GZIP */ |
312 | #endif | 326 | #endif |
313 | 327 | ||
328 | #ifdef WANT_COMPRESSION_ZSTD | ||
329 | |||
330 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode) { | ||
331 | int bucket; | ||
332 | char *r; | ||
333 | struct iovec iovector = {NULL, 0}; | ||
334 | ZSTD_CCtx *zstream = ZSTD_createCCtx(); | ||
335 | ZSTD_inBuffer inbuf; | ||
336 | ZSTD_outBuffer outbuf; | ||
337 | size_t more_bytes; | ||
338 | |||
339 | if (!zstream) | ||
340 | return; | ||
341 | |||
342 | /* Setup return vector... */ | ||
343 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
344 | if (!iovector.iov_base) { | ||
345 | ZSTD_freeCCtx(zstream); | ||
346 | return; | ||
347 | } | ||
348 | |||
349 | /* Working with a compression level 6 is half as fast as level 3, but | ||
350 | seems to be the last reasonable bump that's worth extra cpu */ | ||
351 | ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6); | ||
352 | |||
353 | outbuf.dst = iovector.iov_base; | ||
354 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
355 | outbuf.pos = 0; | ||
356 | |||
357 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
358 | inbuf.src = (const void *)"d5:filesd"; | ||
359 | inbuf.size = strlen("d5:filesd"); | ||
360 | inbuf.pos = 0; | ||
361 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
362 | } | ||
363 | |||
364 | /* For each bucket... */ | ||
365 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { | ||
366 | /* Get exclusive access to that bucket */ | ||
367 | ot_vector *torrents_list = mutex_bucket_lock(bucket); | ||
368 | ot_torrent *torrents = (ot_torrent *)(torrents_list->data); | ||
369 | size_t i; | ||
370 | |||
371 | /* For each torrent in this bucket.. */ | ||
372 | for (i = 0; i < torrents_list->size; ++i) { | ||
373 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; | ||
374 | r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash); | ||
375 | inbuf.src = compress_buffer; | ||
376 | inbuf.size = r - compress_buffer; | ||
377 | inbuf.pos = 0; | ||
378 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
379 | |||
380 | /* Check if there still is enough buffer left */ | ||
381 | while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) { | ||
382 | iovector.iov_len = outbuf.size; | ||
383 | |||
384 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
385 | free(iovector.iov_base); | ||
386 | ZSTD_freeCCtx(zstream); | ||
387 | return mutex_bucket_unlock(bucket, 0); | ||
388 | } | ||
389 | /* Allocate a fresh output buffer */ | ||
390 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
391 | if (!iovector.iov_base) { | ||
392 | fprintf(stderr, "Out of memory trying to claim ouput buffer\n"); | ||
393 | ZSTD_freeCCtx(zstream); | ||
394 | return mutex_bucket_unlock(bucket, 0); | ||
395 | } | ||
396 | |||
397 | outbuf.dst = iovector.iov_base; | ||
398 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
399 | outbuf.pos = 0; | ||
400 | |||
401 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
402 | } | ||
403 | } | ||
404 | |||
405 | /* All torrents done: release lock on current bucket */ | ||
406 | mutex_bucket_unlock(bucket, 0); | ||
407 | |||
408 | /* Parent thread died? */ | ||
409 | if (!g_opentracker_running) | ||
410 | return; | ||
411 | } | ||
412 | |||
413 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
414 | inbuf.src = (const void *)"ee"; | ||
415 | inbuf.size = strlen("ee"); | ||
416 | inbuf.pos = 0; | ||
417 | } | ||
418 | |||
419 | more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
420 | |||
421 | iovector.iov_len = outbuf.pos; | ||
422 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
423 | free(iovector.iov_base); | ||
424 | ZSTD_freeCCtx(zstream); | ||
425 | return; | ||
426 | } | ||
427 | |||
428 | /* Check if there's a last batch of data in the zlib buffer */ | ||
429 | if (more_bytes) { | ||
430 | /* Allocate a fresh output buffer */ | ||
431 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
432 | |||
433 | if (!iovector.iov_base) { | ||
434 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | ||
435 | ZSTD_freeCCtx(zstream); | ||
436 | return; | ||
437 | } | ||
438 | |||
439 | outbuf.dst = iovector.iov_base; | ||
440 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
441 | outbuf.pos = 0; | ||
442 | |||
443 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
444 | |||
445 | /* Only pass the new buffer if there actually was some data left in the buffer */ | ||
446 | iovector.iov_len = outbuf.pos; | ||
447 | if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector)) | ||
448 | free(iovector.iov_base); | ||
449 | } | ||
450 | |||
451 | ZSTD_freeCCtx(zstream); | ||
452 | } | ||
453 | /* WANT_COMPRESSION_ZSTD */ | ||
454 | #endif | ||
455 | |||
314 | /* WANT_FULLSCRAPE */ | 456 | /* WANT_FULLSCRAPE */ |
315 | #endif | 457 | #endif |