diff options
Diffstat (limited to 'ot_fullscrape.c')
| -rw-r--r-- | ot_fullscrape.c | 150 |
1 files changed, 146 insertions, 4 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c index aed2ad9..6fd6d1c 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c | |||
| @@ -14,6 +14,10 @@ | |||
| 14 | #ifdef WANT_COMPRESSION_GZIP | 14 | #ifdef WANT_COMPRESSION_GZIP |
| 15 | #include <zlib.h> | 15 | #include <zlib.h> |
| 16 | #endif | 16 | #endif |
| 17 | #ifdef WANT_COMPRESSION_ZSTD | ||
| 18 | #include <zstd.h> | ||
| 19 | #endif | ||
| 20 | |||
| 17 | 21 | ||
| 18 | /* Libowfat */ | 22 | /* Libowfat */ |
| 19 | #include "byte.h" | 23 | #include "byte.h" |
| @@ -40,6 +44,9 @@ static void fullscrape_make(int taskid, ot_tasktype mode); | |||
| 40 | #ifdef WANT_COMPRESSION_GZIP | 44 | #ifdef WANT_COMPRESSION_GZIP |
| 41 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode); | 45 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode); |
| 42 | #endif | 46 | #endif |
| 47 | #ifdef WANT_COMPRESSION_ZSTD | ||
| 48 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode); | ||
| 49 | #endif | ||
| 43 | 50 | ||
| 44 | /* Converter function from memory to human readable hex strings | 51 | /* Converter function from memory to human readable hex strings |
| 45 | XXX - Duplicated from ot_stats. Needs fix. */ | 52 | XXX - Duplicated from ot_stats. Needs fix. */ |
| @@ -64,6 +71,11 @@ static void *fullscrape_worker(void *args) { | |||
| 64 | while (g_opentracker_running) { | 71 | while (g_opentracker_running) { |
| 65 | ot_tasktype tasktype = TASK_FULLSCRAPE; | 72 | ot_tasktype tasktype = TASK_FULLSCRAPE; |
| 66 | ot_taskid taskid = mutex_workqueue_poptask(&tasktype); | 73 | ot_taskid taskid = mutex_workqueue_poptask(&tasktype); |
| 74 | #ifdef WANT_COMPRESSION_ZSTD | ||
| 75 | if (tasktype & TASK_FLAG_ZSTD) | ||
| 76 | fullscrape_make_zstd(taskid, tasktype); | ||
| 77 | else | ||
| 78 | #endif | ||
| 67 | #ifdef WANT_COMPRESSION_GZIP | 79 | #ifdef WANT_COMPRESSION_GZIP |
| 68 | if (tasktype & TASK_FLAG_GZIP) | 80 | if (tasktype & TASK_FLAG_GZIP) |
| 69 | fullscrape_make_gzip(taskid, tasktype); | 81 | fullscrape_make_gzip(taskid, tasktype); |
| @@ -205,7 +217,6 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
| 205 | struct iovec iovector = {NULL, 0}; | 217 | struct iovec iovector = {NULL, 0}; |
| 206 | int zres; | 218 | int zres; |
| 207 | z_stream strm; | 219 | z_stream strm; |
| 208 | fprintf(stderr, "GZIP path\n"); | ||
| 209 | /* Setup return vector... */ | 220 | /* Setup return vector... */ |
| 210 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | 221 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); |
| 211 | if (!iovector.iov_base) | 222 | if (!iovector.iov_base) |
| @@ -267,8 +278,10 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
| 267 | mutex_bucket_unlock(bucket, 0); | 278 | mutex_bucket_unlock(bucket, 0); |
| 268 | 279 | ||
| 269 | /* Parent thread died? */ | 280 | /* Parent thread died? */ |
| 270 | if (!g_opentracker_running) | 281 | if (!g_opentracker_running) { |
| 282 | deflateEnd(&strm); | ||
| 271 | return; | 283 | return; |
| 284 | } | ||
| 272 | } | 285 | } |
| 273 | 286 | ||
| 274 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | 287 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { |
| @@ -282,7 +295,8 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
| 282 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; | 295 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; |
| 283 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | 296 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { |
| 284 | free(iovector.iov_base); | 297 | free(iovector.iov_base); |
| 285 | return mutex_bucket_unlock(bucket, 0); | 298 | deflateEnd(&strm); |
| 299 | return; | ||
| 286 | } | 300 | } |
| 287 | 301 | ||
| 288 | /* Check if there's a last batch of data in the zlib buffer */ | 302 | /* Check if there's a last batch of data in the zlib buffer */ |
| @@ -293,7 +307,7 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
| 293 | if (!iovector.iov_base) { | 307 | if (!iovector.iov_base) { |
| 294 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | 308 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); |
| 295 | deflateEnd(&strm); | 309 | deflateEnd(&strm); |
| 296 | return mutex_bucket_unlock(bucket, 0); | 310 | return; |
| 297 | } | 311 | } |
| 298 | strm.next_out = iovector.iov_base; | 312 | strm.next_out = iovector.iov_base; |
| 299 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | 313 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; |
| @@ -311,5 +325,133 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
| 311 | /* WANT_COMPRESSION_GZIP */ | 325 | /* WANT_COMPRESSION_GZIP */ |
| 312 | #endif | 326 | #endif |
| 313 | 327 | ||
| 328 | #ifdef WANT_COMPRESSION_ZSTD | ||
| 329 | |||
| 330 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode) { | ||
| 331 | int bucket; | ||
| 332 | char *r; | ||
| 333 | struct iovec iovector = {NULL, 0}; | ||
| 334 | ZSTD_CCtx *zstream = ZSTD_createCCtx(); | ||
| 335 | ZSTD_inBuffer inbuf; | ||
| 336 | ZSTD_outBuffer outbuf; | ||
| 337 | size_t more_bytes; | ||
| 338 | |||
| 339 | if (!zstream) | ||
| 340 | return; | ||
| 341 | |||
| 342 | /* Setup return vector... */ | ||
| 343 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
| 344 | if (!iovector.iov_base) { | ||
| 345 | ZSTD_freeCCtx(zstream); | ||
| 346 | return; | ||
| 347 | } | ||
| 348 | |||
| 349 | /* Working with a compression level 6 is half as fast as level 3, but | ||
| 350 | seems to be the last reasonable bump that's worth extra cpu */ | ||
| 351 | ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6); | ||
| 352 | |||
| 353 | outbuf.dst = iovector.iov_base; | ||
| 354 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
| 355 | outbuf.pos = 0; | ||
| 356 | |||
| 357 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
| 358 | inbuf.src = (const void *)"d5:filesd"; | ||
| 359 | inbuf.size = strlen("d5:filesd"); | ||
| 360 | inbuf.pos = 0; | ||
| 361 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
| 362 | } | ||
| 363 | |||
| 364 | /* For each bucket... */ | ||
| 365 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { | ||
| 366 | /* Get exclusive access to that bucket */ | ||
| 367 | ot_vector *torrents_list = mutex_bucket_lock(bucket); | ||
| 368 | ot_torrent *torrents = (ot_torrent *)(torrents_list->data); | ||
| 369 | size_t i; | ||
| 370 | |||
| 371 | /* For each torrent in this bucket.. */ | ||
| 372 | for (i = 0; i < torrents_list->size; ++i) { | ||
| 373 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; | ||
| 374 | r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash); | ||
| 375 | inbuf.src = compress_buffer; | ||
| 376 | inbuf.size = r - compress_buffer; | ||
| 377 | inbuf.pos = 0; | ||
| 378 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
| 379 | |||
| 380 | /* Check if there still is enough buffer left */ | ||
| 381 | while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) { | ||
| 382 | iovector.iov_len = outbuf.size; | ||
| 383 | |||
| 384 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
| 385 | free(iovector.iov_base); | ||
| 386 | ZSTD_freeCCtx(zstream); | ||
| 387 | return mutex_bucket_unlock(bucket, 0); | ||
| 388 | } | ||
| 389 | /* Allocate a fresh output buffer */ | ||
| 390 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
| 391 | if (!iovector.iov_base) { | ||
| 392 | fprintf(stderr, "Out of memory trying to claim ouput buffer\n"); | ||
| 393 | ZSTD_freeCCtx(zstream); | ||
| 394 | return mutex_bucket_unlock(bucket, 0); | ||
| 395 | } | ||
| 396 | |||
| 397 | outbuf.dst = iovector.iov_base; | ||
| 398 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
| 399 | outbuf.pos = 0; | ||
| 400 | |||
| 401 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
| 402 | } | ||
| 403 | } | ||
| 404 | |||
| 405 | /* All torrents done: release lock on current bucket */ | ||
| 406 | mutex_bucket_unlock(bucket, 0); | ||
| 407 | |||
| 408 | /* Parent thread died? */ | ||
| 409 | if (!g_opentracker_running) | ||
| 410 | return; | ||
| 411 | } | ||
| 412 | |||
| 413 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
| 414 | inbuf.src = (const void *)"ee"; | ||
| 415 | inbuf.size = strlen("ee"); | ||
| 416 | inbuf.pos = 0; | ||
| 417 | } | ||
| 418 | |||
| 419 | more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
| 420 | |||
| 421 | iovector.iov_len = outbuf.pos; | ||
| 422 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
| 423 | free(iovector.iov_base); | ||
| 424 | ZSTD_freeCCtx(zstream); | ||
| 425 | return; | ||
| 426 | } | ||
| 427 | |||
| 428 | /* Check if there's a last batch of data in the zlib buffer */ | ||
| 429 | if (more_bytes) { | ||
| 430 | /* Allocate a fresh output buffer */ | ||
| 431 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
| 432 | |||
| 433 | if (!iovector.iov_base) { | ||
| 434 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | ||
| 435 | ZSTD_freeCCtx(zstream); | ||
| 436 | return; | ||
| 437 | } | ||
| 438 | |||
| 439 | outbuf.dst = iovector.iov_base; | ||
| 440 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
| 441 | outbuf.pos = 0; | ||
| 442 | |||
| 443 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
| 444 | |||
| 445 | /* Only pass the new buffer if there actually was some data left in the buffer */ | ||
| 446 | iovector.iov_len = outbuf.pos; | ||
| 447 | if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector)) | ||
| 448 | free(iovector.iov_base); | ||
| 449 | } | ||
| 450 | |||
| 451 | ZSTD_freeCCtx(zstream); | ||
| 452 | } | ||
| 453 | /* WANT_COMPRESSION_ZSTD */ | ||
| 454 | #endif | ||
| 455 | |||
| 314 | /* WANT_FULLSCRAPE */ | 456 | /* WANT_FULLSCRAPE */ |
| 315 | #endif | 457 | #endif |
