diff options
author | Dirk Engling <erdgeist@erdgeist.org> | 2024-04-18 14:54:34 +0200 |
---|---|---|
committer | Dirk Engling <erdgeist@erdgeist.org> | 2024-04-18 14:54:34 +0200 |
commit | 33bd2c9094e7f90a62cb59cdf5cf670ac58d5308 (patch) | |
tree | 9b5825b65a655953a605c3cc7525eb9456e686e6 | |
parent | 160ba08074827f0ddecaf611dd9f9b15593be9c1 (diff) |
Add support for zstd
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | ot_fullscrape.c | 150 | ||||
-rw-r--r-- | ot_http.c | 35 | ||||
-rw-r--r-- | ot_http.h | 5 | ||||
-rw-r--r-- | ot_mutex.h | 3 | ||||
-rw-r--r-- | ot_stats.h | 1 |
6 files changed, 183 insertions, 16 deletions
@@ -27,6 +27,11 @@ STRIP?=strip | |||
27 | #FEATURES+=-DWANT_IP_FROM_QUERY_STRING | 27 | #FEATURES+=-DWANT_IP_FROM_QUERY_STRING |
28 | FEATURES+=-DWANT_COMPRESSION_GZIP | 28 | FEATURES+=-DWANT_COMPRESSION_GZIP |
29 | FEATURES+=-DWANT_COMPRESSION_GZIP_ALWAYS | 29 | FEATURES+=-DWANT_COMPRESSION_GZIP_ALWAYS |
30 | |||
31 | #FEATURES+=-DWANT_COMPRESSION_ZSTD | ||
32 | #FEATURES+=-DWANT_COMPRESSION_ZSTD_ALWAYS | ||
33 | #LDFLAGS+=-lzstd | ||
34 | |||
30 | #FEATURES+=-DWANT_LOG_NETWORKS | 35 | #FEATURES+=-DWANT_LOG_NETWORKS |
31 | #FEATURES+=-DWANT_RESTRICT_STATS | 36 | #FEATURES+=-DWANT_RESTRICT_STATS |
32 | #FEATURES+=-DWANT_IP_FROM_PROXY | 37 | #FEATURES+=-DWANT_IP_FROM_PROXY |
diff --git a/ot_fullscrape.c b/ot_fullscrape.c index aed2ad9..6fd6d1c 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c | |||
@@ -14,6 +14,10 @@ | |||
14 | #ifdef WANT_COMPRESSION_GZIP | 14 | #ifdef WANT_COMPRESSION_GZIP |
15 | #include <zlib.h> | 15 | #include <zlib.h> |
16 | #endif | 16 | #endif |
17 | #ifdef WANT_COMPRESSION_ZSTD | ||
18 | #include <zstd.h> | ||
19 | #endif | ||
20 | |||
17 | 21 | ||
18 | /* Libowfat */ | 22 | /* Libowfat */ |
19 | #include "byte.h" | 23 | #include "byte.h" |
@@ -40,6 +44,9 @@ static void fullscrape_make(int taskid, ot_tasktype mode); | |||
40 | #ifdef WANT_COMPRESSION_GZIP | 44 | #ifdef WANT_COMPRESSION_GZIP |
41 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode); | 45 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode); |
42 | #endif | 46 | #endif |
47 | #ifdef WANT_COMPRESSION_ZSTD | ||
48 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode); | ||
49 | #endif | ||
43 | 50 | ||
44 | /* Converter function from memory to human readable hex strings | 51 | /* Converter function from memory to human readable hex strings |
45 | XXX - Duplicated from ot_stats. Needs fix. */ | 52 | XXX - Duplicated from ot_stats. Needs fix. */ |
@@ -64,6 +71,11 @@ static void *fullscrape_worker(void *args) { | |||
64 | while (g_opentracker_running) { | 71 | while (g_opentracker_running) { |
65 | ot_tasktype tasktype = TASK_FULLSCRAPE; | 72 | ot_tasktype tasktype = TASK_FULLSCRAPE; |
66 | ot_taskid taskid = mutex_workqueue_poptask(&tasktype); | 73 | ot_taskid taskid = mutex_workqueue_poptask(&tasktype); |
74 | #ifdef WANT_COMPRESSION_ZSTD | ||
75 | if (tasktype & TASK_FLAG_ZSTD) | ||
76 | fullscrape_make_zstd(taskid, tasktype); | ||
77 | else | ||
78 | #endif | ||
67 | #ifdef WANT_COMPRESSION_GZIP | 79 | #ifdef WANT_COMPRESSION_GZIP |
68 | if (tasktype & TASK_FLAG_GZIP) | 80 | if (tasktype & TASK_FLAG_GZIP) |
69 | fullscrape_make_gzip(taskid, tasktype); | 81 | fullscrape_make_gzip(taskid, tasktype); |
@@ -205,7 +217,6 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
205 | struct iovec iovector = {NULL, 0}; | 217 | struct iovec iovector = {NULL, 0}; |
206 | int zres; | 218 | int zres; |
207 | z_stream strm; | 219 | z_stream strm; |
208 | fprintf(stderr, "GZIP path\n"); | ||
209 | /* Setup return vector... */ | 220 | /* Setup return vector... */ |
210 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | 221 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); |
211 | if (!iovector.iov_base) | 222 | if (!iovector.iov_base) |
@@ -267,8 +278,10 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
267 | mutex_bucket_unlock(bucket, 0); | 278 | mutex_bucket_unlock(bucket, 0); |
268 | 279 | ||
269 | /* Parent thread died? */ | 280 | /* Parent thread died? */ |
270 | if (!g_opentracker_running) | 281 | if (!g_opentracker_running) { |
282 | deflateEnd(&strm); | ||
271 | return; | 283 | return; |
284 | } | ||
272 | } | 285 | } |
273 | 286 | ||
274 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | 287 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { |
@@ -282,7 +295,8 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
282 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; | 295 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; |
283 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | 296 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { |
284 | free(iovector.iov_base); | 297 | free(iovector.iov_base); |
285 | return mutex_bucket_unlock(bucket, 0); | 298 | deflateEnd(&strm); |
299 | return; | ||
286 | } | 300 | } |
287 | 301 | ||
288 | /* Check if there's a last batch of data in the zlib buffer */ | 302 | /* Check if there's a last batch of data in the zlib buffer */ |
@@ -293,7 +307,7 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
293 | if (!iovector.iov_base) { | 307 | if (!iovector.iov_base) { |
294 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | 308 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); |
295 | deflateEnd(&strm); | 309 | deflateEnd(&strm); |
296 | return mutex_bucket_unlock(bucket, 0); | 310 | return; |
297 | } | 311 | } |
298 | strm.next_out = iovector.iov_base; | 312 | strm.next_out = iovector.iov_base; |
299 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | 313 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; |
@@ -311,5 +325,133 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
311 | /* WANT_COMPRESSION_GZIP */ | 325 | /* WANT_COMPRESSION_GZIP */ |
312 | #endif | 326 | #endif |
313 | 327 | ||
328 | #ifdef WANT_COMPRESSION_ZSTD | ||
329 | |||
330 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode) { | ||
331 | int bucket; | ||
332 | char *r; | ||
333 | struct iovec iovector = {NULL, 0}; | ||
334 | ZSTD_CCtx *zstream = ZSTD_createCCtx(); | ||
335 | ZSTD_inBuffer inbuf; | ||
336 | ZSTD_outBuffer outbuf; | ||
337 | size_t more_bytes; | ||
338 | |||
339 | if (!zstream) | ||
340 | return; | ||
341 | |||
342 | /* Setup return vector... */ | ||
343 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
344 | if (!iovector.iov_base) { | ||
345 | ZSTD_freeCCtx(zstream); | ||
346 | return; | ||
347 | } | ||
348 | |||
349 | /* Working with a compression level 6 is half as fast as level 3, but | ||
350 | seems to be the last reasonable bump that's worth extra cpu */ | ||
351 | ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6); | ||
352 | |||
353 | outbuf.dst = iovector.iov_base; | ||
354 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
355 | outbuf.pos = 0; | ||
356 | |||
357 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
358 | inbuf.src = (const void *)"d5:filesd"; | ||
359 | inbuf.size = strlen("d5:filesd"); | ||
360 | inbuf.pos = 0; | ||
361 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
362 | } | ||
363 | |||
364 | /* For each bucket... */ | ||
365 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { | ||
366 | /* Get exclusive access to that bucket */ | ||
367 | ot_vector *torrents_list = mutex_bucket_lock(bucket); | ||
368 | ot_torrent *torrents = (ot_torrent *)(torrents_list->data); | ||
369 | size_t i; | ||
370 | |||
371 | /* For each torrent in this bucket.. */ | ||
372 | for (i = 0; i < torrents_list->size; ++i) { | ||
373 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; | ||
374 | r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash); | ||
375 | inbuf.src = compress_buffer; | ||
376 | inbuf.size = r - compress_buffer; | ||
377 | inbuf.pos = 0; | ||
378 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
379 | |||
380 | /* Check if there still is enough buffer left */ | ||
381 | while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) { | ||
382 | iovector.iov_len = outbuf.size; | ||
383 | |||
384 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
385 | free(iovector.iov_base); | ||
386 | ZSTD_freeCCtx(zstream); | ||
387 | return mutex_bucket_unlock(bucket, 0); | ||
388 | } | ||
389 | /* Allocate a fresh output buffer */ | ||
390 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
391 | if (!iovector.iov_base) { | ||
392 | fprintf(stderr, "Out of memory trying to claim ouput buffer\n"); | ||
393 | ZSTD_freeCCtx(zstream); | ||
394 | return mutex_bucket_unlock(bucket, 0); | ||
395 | } | ||
396 | |||
397 | outbuf.dst = iovector.iov_base; | ||
398 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
399 | outbuf.pos = 0; | ||
400 | |||
401 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
402 | } | ||
403 | } | ||
404 | |||
405 | /* All torrents done: release lock on current bucket */ | ||
406 | mutex_bucket_unlock(bucket, 0); | ||
407 | |||
408 | /* Parent thread died? */ | ||
409 | if (!g_opentracker_running) | ||
410 | return; | ||
411 | } | ||
412 | |||
413 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
414 | inbuf.src = (const void *)"ee"; | ||
415 | inbuf.size = strlen("ee"); | ||
416 | inbuf.pos = 0; | ||
417 | } | ||
418 | |||
419 | more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
420 | |||
421 | iovector.iov_len = outbuf.pos; | ||
422 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
423 | free(iovector.iov_base); | ||
424 | ZSTD_freeCCtx(zstream); | ||
425 | return; | ||
426 | } | ||
427 | |||
428 | /* Check if there's a last batch of data in the zlib buffer */ | ||
429 | if (more_bytes) { | ||
430 | /* Allocate a fresh output buffer */ | ||
431 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
432 | |||
433 | if (!iovector.iov_base) { | ||
434 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | ||
435 | ZSTD_freeCCtx(zstream); | ||
436 | return; | ||
437 | } | ||
438 | |||
439 | outbuf.dst = iovector.iov_base; | ||
440 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
441 | outbuf.pos = 0; | ||
442 | |||
443 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
444 | |||
445 | /* Only pass the new buffer if there actually was some data left in the buffer */ | ||
446 | iovector.iov_len = outbuf.pos; | ||
447 | if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector)) | ||
448 | free(iovector.iov_base); | ||
449 | } | ||
450 | |||
451 | ZSTD_freeCCtx(zstream); | ||
452 | } | ||
453 | /* WANT_COMPRESSION_ZSTD */ | ||
454 | #endif | ||
455 | |||
314 | /* WANT_FULLSCRAPE */ | 456 | /* WANT_FULLSCRAPE */ |
315 | #endif | 457 | #endif |
@@ -159,7 +159,9 @@ ssize_t http_sendiovecdata(const int64 sock, struct ot_workstruct *ws, int iovec | |||
159 | 159 | ||
160 | if (iovec_entries) { | 160 | if (iovec_entries) { |
161 | 161 | ||
162 | if (cookie->flag & STRUCT_HTTP_FLAG_GZIP) | 162 | if (cookie->flag & STRUCT_HTTP_FLAG_ZSTD) |
163 | encoding = "Content-Encoding: zstd\r\n"; | ||
164 | else if (cookie->flag & STRUCT_HTTP_FLAG_GZIP) | ||
163 | encoding = "Content-Encoding: gzip\r\n"; | 165 | encoding = "Content-Encoding: gzip\r\n"; |
164 | else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2) | 166 | else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2) |
165 | encoding = "Content-Encoding: bzip2\r\n"; | 167 | encoding = "Content-Encoding: bzip2\r\n"; |
@@ -369,19 +371,34 @@ static ssize_t http_handle_fullscrape(const int64 sock, struct ot_workstruct *ws | |||
369 | } | 371 | } |
370 | #endif | 372 | #endif |
371 | 373 | ||
372 | #ifdef WANT_COMPRESSION_GZIP | 374 | |
375 | #if defined(WANT_COMPRESSION_GZIP) || defined(WANT_COMPRESSION_ZSTD) | ||
373 | ws->request[ws->request_size - 1] = 0; | 376 | ws->request[ws->request_size - 1] = 0; |
374 | #ifndef WANT_COMPRESSION_GZIP_ALWAYS | 377 | #ifdef WANT_COMPRESSION_GZIP |
375 | if (strstr(ws->request, "gzip")) { | 378 | if (strstr(ws->request, "gzip")) { |
376 | #endif | ||
377 | cookie->flag |= STRUCT_HTTP_FLAG_GZIP; | 379 | cookie->flag |= STRUCT_HTTP_FLAG_GZIP; |
378 | format = TASK_FLAG_GZIP; | 380 | format |= TASK_FLAG_GZIP; |
379 | stats_issue_event(EVENT_FULLSCRAPE_REQUEST_GZIP, 0, (uintptr_t)cookie->ip); | 381 | } |
380 | #ifndef WANT_COMPRESSION_GZIP_ALWAYS | ||
381 | } else | ||
382 | #endif | 382 | #endif |
383 | #ifdef WANT_COMPRESSION_ZSTD | ||
384 | if (strstr(ws->request, "zstd")) { | ||
385 | cookie->flag |= STRUCT_HTTP_FLAG_ZSTD; | ||
386 | format |= TASK_FLAG_ZSTD; | ||
387 | } | ||
388 | #endif | ||
389 | |||
390 | #if defined(WANT_COMPRESSION_ZSTD) && defined(WANT_COMPRESSION_ZSTD_ALWAYS) | ||
391 | cookie->flag |= STRUCT_HTTP_FLAG_ZSTD; | ||
392 | format |= TASK_FLAG_ZSTD; | ||
383 | #endif | 393 | #endif |
384 | stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip); | 394 | |
395 | #if defined(WANT_COMPRESSION_GZIP) && defined(WANT_COMPRESSION_GZIP_ALWAYS) | ||
396 | cookie->flag |= STRUCT_HTTP_FLAG_GZIP; | ||
397 | format |= TASK_FLAG_GZIP; | ||
398 | #endif | ||
399 | #endif | ||
400 | |||
401 | stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip); | ||
385 | 402 | ||
386 | #ifdef _DEBUG_HTTPERROR | 403 | #ifdef _DEBUG_HTTPERROR |
387 | fprintf(stderr, "%s", ws->debugbuf); | 404 | fprintf(stderr, "%s", ws->debugbuf); |
@@ -10,8 +10,9 @@ typedef enum { | |||
10 | STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, | 10 | STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, |
11 | STRUCT_HTTP_FLAG_GZIP = 2, | 11 | STRUCT_HTTP_FLAG_GZIP = 2, |
12 | STRUCT_HTTP_FLAG_BZIP2 = 4, | 12 | STRUCT_HTTP_FLAG_BZIP2 = 4, |
13 | STRUCT_HTTP_FLAG_CHUNKED = 8, | 13 | STRUCT_HTTP_FLAG_ZSTD = 8, |
14 | STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 16 | 14 | STRUCT_HTTP_FLAG_CHUNKED = 16, |
15 | STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 32 | ||
15 | } STRUCT_HTTP_FLAG; | 16 | } STRUCT_HTTP_FLAG; |
16 | 17 | ||
17 | struct http_data { | 18 | struct http_data { |
@@ -59,7 +59,8 @@ typedef enum { | |||
59 | 59 | ||
60 | TASK_FLAG_GZIP = 0x1000, | 60 | TASK_FLAG_GZIP = 0x1000, |
61 | TASK_FLAG_BZIP2 = 0x2000, | 61 | TASK_FLAG_BZIP2 = 0x2000, |
62 | TASK_FLAG_CHUNKED = 0x4000, | 62 | TASK_FLAG_ZSTD = 0x4000, |
63 | TASK_FLAG_CHUNKED = 0x8000, | ||
63 | 64 | ||
64 | TASK_TASK_MASK = 0x0fff, | 65 | TASK_TASK_MASK = 0x0fff, |
65 | TASK_CLASS_MASK = 0x0f00, | 66 | TASK_CLASS_MASK = 0x0f00, |
@@ -19,6 +19,7 @@ typedef enum { | |||
19 | EVENT_SCRAPE, | 19 | EVENT_SCRAPE, |
20 | EVENT_FULLSCRAPE_REQUEST, | 20 | EVENT_FULLSCRAPE_REQUEST, |
21 | EVENT_FULLSCRAPE_REQUEST_GZIP, | 21 | EVENT_FULLSCRAPE_REQUEST_GZIP, |
22 | EVENT_FULLSCRAPE_REQUEST_ZSTD, | ||
22 | EVENT_FULLSCRAPE, /* TCP only */ | 23 | EVENT_FULLSCRAPE, /* TCP only */ |
23 | EVENT_FAILED, | 24 | EVENT_FAILED, |
24 | EVENT_BUCKET_LOCKED, | 25 | EVENT_BUCKET_LOCKED, |