summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDirk Engling <erdgeist@erdgeist.org>2024-04-18 14:54:34 +0200
committerDirk Engling <erdgeist@erdgeist.org>2024-04-18 14:54:34 +0200
commit33bd2c9094e7f90a62cb59cdf5cf670ac58d5308 (patch)
tree9b5825b65a655953a605c3cc7525eb9456e686e6
parent160ba08074827f0ddecaf611dd9f9b15593be9c1 (diff)
Add support for zstd
-rw-r--r--Makefile5
-rw-r--r--ot_fullscrape.c150
-rw-r--r--ot_http.c35
-rw-r--r--ot_http.h5
-rw-r--r--ot_mutex.h3
-rw-r--r--ot_stats.h1
6 files changed, 183 insertions, 16 deletions
diff --git a/Makefile b/Makefile
index a224845..e5ca6e4 100644
--- a/Makefile
+++ b/Makefile
@@ -27,6 +27,11 @@ STRIP?=strip
27#FEATURES+=-DWANT_IP_FROM_QUERY_STRING 27#FEATURES+=-DWANT_IP_FROM_QUERY_STRING
28FEATURES+=-DWANT_COMPRESSION_GZIP 28FEATURES+=-DWANT_COMPRESSION_GZIP
29FEATURES+=-DWANT_COMPRESSION_GZIP_ALWAYS 29FEATURES+=-DWANT_COMPRESSION_GZIP_ALWAYS
30
31#FEATURES+=-DWANT_COMPRESSION_ZSTD
32#FEATURES+=-DWANT_COMPRESSION_ZSTD_ALWAYS
33#LDFLAGS+=-lzstd
34
30#FEATURES+=-DWANT_LOG_NETWORKS 35#FEATURES+=-DWANT_LOG_NETWORKS
31#FEATURES+=-DWANT_RESTRICT_STATS 36#FEATURES+=-DWANT_RESTRICT_STATS
32#FEATURES+=-DWANT_IP_FROM_PROXY 37#FEATURES+=-DWANT_IP_FROM_PROXY
diff --git a/ot_fullscrape.c b/ot_fullscrape.c
index aed2ad9..6fd6d1c 100644
--- a/ot_fullscrape.c
+++ b/ot_fullscrape.c
@@ -14,6 +14,10 @@
14#ifdef WANT_COMPRESSION_GZIP 14#ifdef WANT_COMPRESSION_GZIP
15#include <zlib.h> 15#include <zlib.h>
16#endif 16#endif
17#ifdef WANT_COMPRESSION_ZSTD
18#include <zstd.h>
19#endif
20
17 21
18/* Libowfat */ 22/* Libowfat */
19#include "byte.h" 23#include "byte.h"
@@ -40,6 +44,9 @@ static void fullscrape_make(int taskid, ot_tasktype mode);
40#ifdef WANT_COMPRESSION_GZIP 44#ifdef WANT_COMPRESSION_GZIP
41static void fullscrape_make_gzip(int taskid, ot_tasktype mode); 45static void fullscrape_make_gzip(int taskid, ot_tasktype mode);
42#endif 46#endif
47#ifdef WANT_COMPRESSION_ZSTD
48static void fullscrape_make_zstd(int taskid, ot_tasktype mode);
49#endif
43 50
44/* Converter function from memory to human readable hex strings 51/* Converter function from memory to human readable hex strings
45 XXX - Duplicated from ot_stats. Needs fix. */ 52 XXX - Duplicated from ot_stats. Needs fix. */
@@ -64,6 +71,11 @@ static void *fullscrape_worker(void *args) {
64 while (g_opentracker_running) { 71 while (g_opentracker_running) {
65 ot_tasktype tasktype = TASK_FULLSCRAPE; 72 ot_tasktype tasktype = TASK_FULLSCRAPE;
66 ot_taskid taskid = mutex_workqueue_poptask(&tasktype); 73 ot_taskid taskid = mutex_workqueue_poptask(&tasktype);
74#ifdef WANT_COMPRESSION_ZSTD
75 if (tasktype & TASK_FLAG_ZSTD)
76 fullscrape_make_zstd(taskid, tasktype);
77 else
78#endif
67#ifdef WANT_COMPRESSION_GZIP 79#ifdef WANT_COMPRESSION_GZIP
68 if (tasktype & TASK_FLAG_GZIP) 80 if (tasktype & TASK_FLAG_GZIP)
69 fullscrape_make_gzip(taskid, tasktype); 81 fullscrape_make_gzip(taskid, tasktype);
@@ -205,7 +217,6 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
205 struct iovec iovector = {NULL, 0}; 217 struct iovec iovector = {NULL, 0};
206 int zres; 218 int zres;
207 z_stream strm; 219 z_stream strm;
208 fprintf(stderr, "GZIP path\n");
209 /* Setup return vector... */ 220 /* Setup return vector... */
210 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); 221 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
211 if (!iovector.iov_base) 222 if (!iovector.iov_base)
@@ -267,8 +278,10 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
267 mutex_bucket_unlock(bucket, 0); 278 mutex_bucket_unlock(bucket, 0);
268 279
269 /* Parent thread died? */ 280 /* Parent thread died? */
270 if (!g_opentracker_running) 281 if (!g_opentracker_running) {
282 deflateEnd(&strm);
271 return; 283 return;
284 }
272 } 285 }
273 286
274 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { 287 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
@@ -282,7 +295,8 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
282 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; 295 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
283 if (mutex_workqueue_pushchunked(taskid, &iovector)) { 296 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
284 free(iovector.iov_base); 297 free(iovector.iov_base);
285 return mutex_bucket_unlock(bucket, 0); 298 deflateEnd(&strm);
299 return;
286 } 300 }
287 301
288 /* Check if there's a last batch of data in the zlib buffer */ 302 /* Check if there's a last batch of data in the zlib buffer */
@@ -293,7 +307,7 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
293 if (!iovector.iov_base) { 307 if (!iovector.iov_base) {
294 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); 308 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
295 deflateEnd(&strm); 309 deflateEnd(&strm);
296 return mutex_bucket_unlock(bucket, 0); 310 return;
297 } 311 }
298 strm.next_out = iovector.iov_base; 312 strm.next_out = iovector.iov_base;
299 strm.avail_out = OT_SCRAPE_CHUNK_SIZE; 313 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
@@ -311,5 +325,133 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
311/* WANT_COMPRESSION_GZIP */ 325/* WANT_COMPRESSION_GZIP */
312#endif 326#endif
313 327
328#ifdef WANT_COMPRESSION_ZSTD
329
330static void fullscrape_make_zstd(int taskid, ot_tasktype mode) {
331 int bucket;
332 char *r;
333 struct iovec iovector = {NULL, 0};
334 ZSTD_CCtx *zstream = ZSTD_createCCtx();
335 ZSTD_inBuffer inbuf;
336 ZSTD_outBuffer outbuf;
337 size_t more_bytes;
338
339 if (!zstream)
340 return;
341
342 /* Setup return vector... */
343 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
344 if (!iovector.iov_base) {
345 ZSTD_freeCCtx(zstream);
346 return;
347 }
348
349 /* Working with a compression level 6 is half as fast as level 3, but
350 seems to be the last reasonable bump that's worth extra cpu */
351 ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6);
352
353 outbuf.dst = iovector.iov_base;
354 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
355 outbuf.pos = 0;
356
357 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
358 inbuf.src = (const void *)"d5:filesd";
359 inbuf.size = strlen("d5:filesd");
360 inbuf.pos = 0;
361 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
362 }
363
364 /* For each bucket... */
365 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
366 /* Get exclusive access to that bucket */
367 ot_vector *torrents_list = mutex_bucket_lock(bucket);
368 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
369 size_t i;
370
371 /* For each torrent in this bucket.. */
372 for (i = 0; i < torrents_list->size; ++i) {
373 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
374 r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
375 inbuf.src = compress_buffer;
376 inbuf.size = r - compress_buffer;
377 inbuf.pos = 0;
378 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
379
380 /* Check if there still is enough buffer left */
381 while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) {
382 iovector.iov_len = outbuf.size;
383
384 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
385 free(iovector.iov_base);
386 ZSTD_freeCCtx(zstream);
387 return mutex_bucket_unlock(bucket, 0);
388 }
389 /* Allocate a fresh output buffer */
390 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
391 if (!iovector.iov_base) {
392 fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
393 ZSTD_freeCCtx(zstream);
394 return mutex_bucket_unlock(bucket, 0);
395 }
396
397 outbuf.dst = iovector.iov_base;
398 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
399 outbuf.pos = 0;
400
401 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
402 }
403 }
404
405 /* All torrents done: release lock on current bucket */
406 mutex_bucket_unlock(bucket, 0);
407
408 /* Parent thread died? */
409 if (!g_opentracker_running)
410 return;
411 }
412
413 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
414 inbuf.src = (const void *)"ee";
415 inbuf.size = strlen("ee");
416 inbuf.pos = 0;
417 }
418
419 more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
420
421 iovector.iov_len = outbuf.pos;
422 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
423 free(iovector.iov_base);
424 ZSTD_freeCCtx(zstream);
425 return;
426 }
427
428 /* Check if there's a last batch of data in the zlib buffer */
429 if (more_bytes) {
430 /* Allocate a fresh output buffer */
431 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
432
433 if (!iovector.iov_base) {
434 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
435 ZSTD_freeCCtx(zstream);
436 return;
437 }
438
439 outbuf.dst = iovector.iov_base;
440 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
441 outbuf.pos = 0;
442
443 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
444
445 /* Only pass the new buffer if there actually was some data left in the buffer */
446 iovector.iov_len = outbuf.pos;
447 if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
448 free(iovector.iov_base);
449 }
450
451 ZSTD_freeCCtx(zstream);
452}
453/* WANT_COMPRESSION_ZSTD */
454#endif
455
314/* WANT_FULLSCRAPE */ 456/* WANT_FULLSCRAPE */
315#endif 457#endif
diff --git a/ot_http.c b/ot_http.c
index cd2dfc1..af3f210 100644
--- a/ot_http.c
+++ b/ot_http.c
@@ -159,7 +159,9 @@ ssize_t http_sendiovecdata(const int64 sock, struct ot_workstruct *ws, int iovec
159 159
160 if (iovec_entries) { 160 if (iovec_entries) {
161 161
162 if (cookie->flag & STRUCT_HTTP_FLAG_GZIP) 162 if (cookie->flag & STRUCT_HTTP_FLAG_ZSTD)
163 encoding = "Content-Encoding: zstd\r\n";
164 else if (cookie->flag & STRUCT_HTTP_FLAG_GZIP)
163 encoding = "Content-Encoding: gzip\r\n"; 165 encoding = "Content-Encoding: gzip\r\n";
164 else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2) 166 else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2)
165 encoding = "Content-Encoding: bzip2\r\n"; 167 encoding = "Content-Encoding: bzip2\r\n";
@@ -369,19 +371,34 @@ static ssize_t http_handle_fullscrape(const int64 sock, struct ot_workstruct *ws
369 } 371 }
370#endif 372#endif
371 373
372#ifdef WANT_COMPRESSION_GZIP 374
375#if defined(WANT_COMPRESSION_GZIP) || defined(WANT_COMPRESSION_ZSTD)
373 ws->request[ws->request_size - 1] = 0; 376 ws->request[ws->request_size - 1] = 0;
374#ifndef WANT_COMPRESSION_GZIP_ALWAYS 377#ifdef WANT_COMPRESSION_GZIP
375 if (strstr(ws->request, "gzip")) { 378 if (strstr(ws->request, "gzip")) {
376#endif
377 cookie->flag |= STRUCT_HTTP_FLAG_GZIP; 379 cookie->flag |= STRUCT_HTTP_FLAG_GZIP;
378 format = TASK_FLAG_GZIP; 380 format |= TASK_FLAG_GZIP;
379 stats_issue_event(EVENT_FULLSCRAPE_REQUEST_GZIP, 0, (uintptr_t)cookie->ip); 381 }
380#ifndef WANT_COMPRESSION_GZIP_ALWAYS
381 } else
382#endif 382#endif
383#ifdef WANT_COMPRESSION_ZSTD
384 if (strstr(ws->request, "zstd")) {
385 cookie->flag |= STRUCT_HTTP_FLAG_ZSTD;
386 format |= TASK_FLAG_ZSTD;
387 }
388#endif
389
390#if defined(WANT_COMPRESSION_ZSTD) && defined(WANT_COMPRESSION_ZSTD_ALWAYS)
391 cookie->flag |= STRUCT_HTTP_FLAG_ZSTD;
392 format |= TASK_FLAG_ZSTD;
383#endif 393#endif
384 stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip); 394
395#if defined(WANT_COMPRESSION_GZIP) && defined(WANT_COMPRESSION_GZIP_ALWAYS)
396 cookie->flag |= STRUCT_HTTP_FLAG_GZIP;
397 format |= TASK_FLAG_GZIP;
398#endif
399#endif
400
401 stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip);
385 402
386#ifdef _DEBUG_HTTPERROR 403#ifdef _DEBUG_HTTPERROR
387 fprintf(stderr, "%s", ws->debugbuf); 404 fprintf(stderr, "%s", ws->debugbuf);
diff --git a/ot_http.h b/ot_http.h
index fecb4eb..b5ae9ff 100644
--- a/ot_http.h
+++ b/ot_http.h
@@ -10,8 +10,9 @@ typedef enum {
10 STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, 10 STRUCT_HTTP_FLAG_WAITINGFORTASK = 1,
11 STRUCT_HTTP_FLAG_GZIP = 2, 11 STRUCT_HTTP_FLAG_GZIP = 2,
12 STRUCT_HTTP_FLAG_BZIP2 = 4, 12 STRUCT_HTTP_FLAG_BZIP2 = 4,
13 STRUCT_HTTP_FLAG_CHUNKED = 8, 13 STRUCT_HTTP_FLAG_ZSTD = 8,
14 STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 16 14 STRUCT_HTTP_FLAG_CHUNKED = 16,
15 STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 32
15} STRUCT_HTTP_FLAG; 16} STRUCT_HTTP_FLAG;
16 17
17struct http_data { 18struct http_data {
diff --git a/ot_mutex.h b/ot_mutex.h
index 66b627f..cdfabc9 100644
--- a/ot_mutex.h
+++ b/ot_mutex.h
@@ -59,7 +59,8 @@ typedef enum {
59 59
60 TASK_FLAG_GZIP = 0x1000, 60 TASK_FLAG_GZIP = 0x1000,
61 TASK_FLAG_BZIP2 = 0x2000, 61 TASK_FLAG_BZIP2 = 0x2000,
62 TASK_FLAG_CHUNKED = 0x4000, 62 TASK_FLAG_ZSTD = 0x4000,
63 TASK_FLAG_CHUNKED = 0x8000,
63 64
64 TASK_TASK_MASK = 0x0fff, 65 TASK_TASK_MASK = 0x0fff,
65 TASK_CLASS_MASK = 0x0f00, 66 TASK_CLASS_MASK = 0x0f00,
diff --git a/ot_stats.h b/ot_stats.h
index 4f75049..8ed2b1e 100644
--- a/ot_stats.h
+++ b/ot_stats.h
@@ -19,6 +19,7 @@ typedef enum {
19 EVENT_SCRAPE, 19 EVENT_SCRAPE,
20 EVENT_FULLSCRAPE_REQUEST, 20 EVENT_FULLSCRAPE_REQUEST,
21 EVENT_FULLSCRAPE_REQUEST_GZIP, 21 EVENT_FULLSCRAPE_REQUEST_GZIP,
22 EVENT_FULLSCRAPE_REQUEST_ZSTD,
22 EVENT_FULLSCRAPE, /* TCP only */ 23 EVENT_FULLSCRAPE, /* TCP only */
23 EVENT_FAILED, 24 EVENT_FAILED,
24 EVENT_BUCKET_LOCKED, 25 EVENT_BUCKET_LOCKED,