summaryrefslogtreecommitdiff
path: root/ot_fullscrape.c
diff options
context:
space:
mode:
Diffstat (limited to 'ot_fullscrape.c')
-rw-r--r--ot_fullscrape.c510
1 files changed, 359 insertions, 151 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c
index faea4b9..6fd6d1c 100644
--- a/ot_fullscrape.c
+++ b/ot_fullscrape.c
@@ -6,14 +6,18 @@
6#ifdef WANT_FULLSCRAPE 6#ifdef WANT_FULLSCRAPE
7 7
8/* System */ 8/* System */
9#include <sys/param.h> 9#include <arpa/inet.h>
10#include <pthread.h>
10#include <stdio.h> 11#include <stdio.h>
11#include <string.h> 12#include <string.h>
12#include <pthread.h> 13#include <sys/param.h>
13#include <arpa/inet.h>
14#ifdef WANT_COMPRESSION_GZIP 14#ifdef WANT_COMPRESSION_GZIP
15#include <zlib.h> 15#include <zlib.h>
16#endif 16#endif
17#ifdef WANT_COMPRESSION_ZSTD
18#include <zstd.h>
19#endif
20
17 21
18/* Libowfat */ 22/* Libowfat */
19#include "byte.h" 23#include "byte.h"
@@ -21,52 +25,64 @@
21#include "textcode.h" 25#include "textcode.h"
22 26
23/* Opentracker */ 27/* Opentracker */
24#include "trackerlogic.h"
25#include "ot_mutex.h"
26#include "ot_iovec.h"
27#include "ot_fullscrape.h" 28#include "ot_fullscrape.h"
29#include "ot_iovec.h"
30#include "ot_mutex.h"
31#include "trackerlogic.h"
28 32
29/* Fetch full scrape info for all torrents 33/* Fetch full scrape info for all torrents
30 Full scrapes usually are huge and one does not want to 34 Full scrapes usually are huge and one does not want to
31 allocate more memory. So lets get them in 512k units 35 allocate more memory. So lets get them in 512k units
32*/ 36*/
33#define OT_SCRAPE_CHUNK_SIZE (512*1024) 37#define OT_SCRAPE_CHUNK_SIZE (1024 * 1024)
34 38
35/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ 39/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
36#define OT_SCRAPE_MAXENTRYLEN 256 40#define OT_SCRAPE_MAXENTRYLEN 256
37 41
42/* Forward declaration */
43static void fullscrape_make(int taskid, ot_tasktype mode);
38#ifdef WANT_COMPRESSION_GZIP 44#ifdef WANT_COMPRESSION_GZIP
39#define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK 45static void fullscrape_make_gzip(int taskid, ot_tasktype mode);
40#define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 ) , param1, param2, param3 46#endif
41#else 47#ifdef WANT_COMPRESSION_ZSTD
42#define IF_COMPRESSION( TASK ) 48static void fullscrape_make_zstd(int taskid, ot_tasktype mode);
43#define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 )
44#endif 49#endif
45
46/* Forward declaration */
47static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
48 50
49/* Converter function from memory to human readable hex strings 51/* Converter function from memory to human readable hex strings
50 XXX - Duplicated from ot_stats. Needs fix. */ 52 XXX - Duplicated from ot_stats. Needs fix. */
51static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;} 53static char *to_hex(char *d, uint8_t *s) {
54 char *m = "0123456789ABCDEF";
55 char *t = d;
56 char *e = d + 40;
57 while (d < e) {
58 *d++ = m[*s >> 4];
59 *d++ = m[*s++ & 15];
60 }
61 *d = 0;
62 return t;
63}
52 64
53/* This is the entry point into this worker thread 65/* This is the entry point into this worker thread
54 It grabs tasks from mutex_tasklist and delivers results back 66 It grabs tasks from mutex_tasklist and delivers results back
55*/ 67*/
56static void * fullscrape_worker( void * args ) { 68static void *fullscrape_worker(void *args) {
57 int iovec_entries; 69 (void)args;
58 struct iovec *iovector;
59
60 (void) args;
61 70
62 while( 1 ) { 71 while (g_opentracker_running) {
63 ot_tasktype tasktype = TASK_FULLSCRAPE; 72 ot_tasktype tasktype = TASK_FULLSCRAPE;
64 ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); 73 ot_taskid taskid = mutex_workqueue_poptask(&tasktype);
65 fullscrape_make( &iovec_entries, &iovector, tasktype ); 74#ifdef WANT_COMPRESSION_ZSTD
66 if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) 75 if (tasktype & TASK_FLAG_ZSTD)
67 iovec_free( &iovec_entries, &iovector ); 76 fullscrape_make_zstd(taskid, tasktype);
68 if( !g_opentracker_running ) 77 else
69 return NULL; 78#endif
79#ifdef WANT_COMPRESSION_GZIP
80 if (tasktype & TASK_FLAG_GZIP)
81 fullscrape_make_gzip(taskid, tasktype);
82 else
83#endif
84 fullscrape_make(taskid, tasktype);
85 mutex_workqueue_pushchunked(taskid, NULL);
70 } 86 }
71 return NULL; 87 return NULL;
72} 88}
@@ -84,166 +100,358 @@ void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) {
84 mutex_workqueue_pushtask( sock, tasktype ); 100 mutex_workqueue_pushtask( sock, tasktype );
85} 101}
86 102
87static int fullscrape_increase( int *iovec_entries, struct iovec **iovector, 103static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torrent, ot_hash *hash ) {
88 char **r, char **re WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode, int zaction ) ) { 104 size_t seed_count = torrent->peer_list6->seed_count + torrent->peer_list4->seed_count;
89 /* Allocate a fresh output buffer at the end of our buffers list */ 105 size_t peer_count = torrent->peer_list6->peer_count + torrent->peer_list4->peer_count;
90 if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) { 106 size_t down_count = torrent->peer_list6->down_count + torrent->peer_list4->down_count;
107
108 switch (mode & TASK_TASK_MASK) {
109 case TASK_FULLSCRAPE:
110 default:
111 /* push hash as bencoded string */
112 *r++ = '2';
113 *r++ = '0';
114 *r++ = ':';
115 memcpy(r, hash, sizeof(ot_hash));
116 r += sizeof(ot_hash);
117 /* push rest of the scrape string */
118 r += sprintf(r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", seed_count, down_count, peer_count - seed_count);
119
120 break;
121 case TASK_FULLSCRAPE_TPB_ASCII:
122 to_hex(r, *hash);
123 r += 2 * sizeof(ot_hash);
124 r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count);
125 break;
126 case TASK_FULLSCRAPE_TPB_ASCII_PLUS:
127 to_hex(r, *hash);
128 r += 2 * sizeof(ot_hash);
129 r += sprintf(r, ":%zd:%zd:%zd\n", seed_count, peer_count - seed_count, down_count);
130 break;
131 case TASK_FULLSCRAPE_TPB_BINARY:
132 memcpy(r, *hash, sizeof(ot_hash));
133 r += sizeof(ot_hash);
134 *(uint32_t *)(r + 0) = htonl((uint32_t)seed_count);
135 *(uint32_t *)(r + 4) = htonl((uint32_t)(peer_count - seed_count));
136 r += 8;
137 break;
138 case TASK_FULLSCRAPE_TPB_URLENCODED:
139 r += fmt_urlencoded(r, (char *)*hash, 20);
140 r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count);
141 break;
142 case TASK_FULLSCRAPE_TRACKERSTATE:
143 to_hex(r, *hash);
144 r += 2 * sizeof(ot_hash);
145 r += sprintf(r, ":%zd:%zd\n", torrent->peer_list6->base, down_count);
146 break;
147 }
148 return r;
149}
91 150
92 /* Deallocate gzip buffers */ 151static void fullscrape_make(int taskid, ot_tasktype mode) {
93 IF_COMPRESSION( deflateEnd(strm); ) 152 int bucket;
153 char *r, *re;
154 struct iovec iovector = {NULL, 0};
94 155
95 /* Release lock on current bucket and return */ 156 /* Setup return vector... */
96 return -1; 157 r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
97 } 158 if (!r)
159 return;
98 160
99 /* Adjust new end of output buffer */ 161 /* re points to low watermark */
100 *re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; 162 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
101 163
102 /* When compressing, we have all the bytes in output buffer */ 164 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE)
103#ifdef WANT_COMPRESSION_GZIP 165 r += sprintf(r, "d5:filesd");
104 if( mode & TASK_FLAG_GZIP ) { 166
105 int zres; 167 /* For each bucket... */
106 *re -= OT_SCRAPE_MAXENTRYLEN; 168 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
107 strm->next_out = (uint8_t*)*r; 169 /* Get exclusive access to that bucket */
108 strm->avail_out = OT_SCRAPE_CHUNK_SIZE; 170 ot_vector *torrents_list = mutex_bucket_lock(bucket);
109 zres = deflate( strm, zaction ); 171 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
110 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) 172 size_t i;
111 fprintf( stderr, "deflate() failed while in fullscrape_increase(%d).\n", zaction ); 173
112 *r = (char*)strm->next_out; 174 /* For each torrent in this bucket.. */
175 for (i = 0; i < torrents_list->size; ++i) {
176 r = fullscrape_write_one(mode, r, torrents + i, &torrents[i].hash);
177
178 if (r > re) {
179 iovector.iov_len = r - (char *)iovector.iov_base;
180
181 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
182 free(iovector.iov_base);
183 return mutex_bucket_unlock(bucket, 0);
184 }
185 /* Allocate a fresh output buffer */
186 r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
187 if (!r)
188 return mutex_bucket_unlock(bucket, 0);
189
190 /* re points to low watermark */
191 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
192 }
193 }
194
195 /* All torrents done: release lock on current bucket */
196 mutex_bucket_unlock(bucket, 0);
197
198 /* Parent thread died? */
199 if (!g_opentracker_running)
200 return;
113 } 201 }
114#endif
115 202
116 return 0; 203 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE)
204 r += sprintf(r, "ee");
205
206 /* Send rest of data */
207 iovector.iov_len = r - (char *)iovector.iov_base;
208 if (mutex_workqueue_pushchunked(taskid, &iovector))
209 free(iovector.iov_base);
117} 210}
118 211
119static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
120 int bucket;
121 char *r, *re;
122#ifdef WANT_COMPRESSION_GZIP 212#ifdef WANT_COMPRESSION_GZIP
123 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
124 z_stream strm;
125#endif
126 213
214static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
215 int bucket;
216 char *r;
217 struct iovec iovector = {NULL, 0};
218 int zres;
219 z_stream strm;
127 /* Setup return vector... */ 220 /* Setup return vector... */
128 *iovec_entries = 0; 221 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
129 *iovector = NULL; 222 if (!iovector.iov_base)
130 if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) )
131 return; 223 return;
132 224
133 /* re points to low watermark */ 225 byte_zero(&strm, sizeof(strm));
134 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; 226 strm.next_out = (uint8_t *)iovector.iov_base;
227 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
228 if (deflateInit2(&strm, 7, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY) != Z_OK)
229 fprintf(stderr, "not ok.\n");
135 230
136#ifdef WANT_COMPRESSION_GZIP 231 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
137 if( mode & TASK_FLAG_GZIP ) { 232 strm.next_in = (uint8_t *)"d5:filesd";
138 re += OT_SCRAPE_MAXENTRYLEN; 233 strm.avail_in = strlen("d5:filesd");
139 byte_zero( &strm, sizeof(strm) ); 234 zres = deflate(&strm, Z_NO_FLUSH);
140 strm.next_in = (uint8_t*)compress_buffer;
141 strm.next_out = (uint8_t*)r;
142 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
143 if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK )
144 fprintf( stderr, "not ok.\n" );
145 r = compress_buffer;
146 } 235 }
147#endif
148
149 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
150 r += sprintf( r, "d5:filesd" );
151 236
152 /* For each bucket... */ 237 /* For each bucket... */
153 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { 238 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
154 /* Get exclusive access to that bucket */ 239 /* Get exclusive access to that bucket */
155 ot_vector *torrents_list = mutex_bucket_lock( bucket ); 240 ot_vector *torrents_list = mutex_bucket_lock(bucket);
156 size_t tor_offset; 241 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
242 size_t i;
157 243
158 /* For each torrent in this bucket.. */ 244 /* For each torrent in this bucket.. */
159 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { 245 for (i = 0; i < torrents_list->size; ++i) {
160 /* Address torrents members */ 246 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
161 ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; 247 r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
162 ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash; 248 strm.next_in = (uint8_t *)compress_buffer;
163 249 strm.avail_in = r - compress_buffer;
164 switch( mode & TASK_TASK_MASK ) { 250 zres = deflate(&strm, Z_NO_FLUSH);
165 case TASK_FULLSCRAPE: 251 if ((zres < Z_OK) && (zres != Z_BUF_ERROR))
166 default: 252 fprintf(stderr, "deflate() failed while in fullscrape_make().\n");
167 /* push hash as bencoded string */ 253
168 *r++='2'; *r++='0'; *r++=':'; 254 /* Check if there still is enough buffer left */
169 memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash); 255 while (!strm.avail_out) {
170 /* push rest of the scrape string */ 256 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
171 r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); 257
172 258 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
173 break; 259 free(iovector.iov_base);
174 case TASK_FULLSCRAPE_TPB_ASCII: 260 return mutex_bucket_unlock(bucket, 0);
175 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); 261 }
176 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); 262 /* Allocate a fresh output buffer */
177 break; 263 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
178 case TASK_FULLSCRAPE_TPB_ASCII_PLUS: 264 if (!iovector.iov_base) {
179 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); 265 fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
180 r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count ); 266 deflateEnd(&strm);
181 break; 267 return mutex_bucket_unlock(bucket, 0);
182 case TASK_FULLSCRAPE_TPB_BINARY: 268 }
183 memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash); 269 strm.next_out = (uint8_t *)iovector.iov_base;
184 *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count ); 270 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
185 *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) ); 271 zres = deflate(&strm, Z_NO_FLUSH);
186 r+=8; 272 if ((zres < Z_OK) && (zres != Z_BUF_ERROR))
187 break; 273 fprintf(stderr, "deflate() failed while in fullscrape_make().\n");
188 case TASK_FULLSCRAPE_TPB_URLENCODED:
189 r += fmt_urlencoded( r, (char *)*hash, 20 );
190 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
191 break;
192 case TASK_FULLSCRAPE_TRACKERSTATE:
193 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
194 r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count );
195 break;
196 } 274 }
275 }
197 276
198#ifdef WANT_COMPRESSION_GZIP 277 /* All torrents done: release lock on current bucket */
199 if( mode & TASK_FLAG_GZIP ) { 278 mutex_bucket_unlock(bucket, 0);
200 int zres; 279
201 strm.next_in = (uint8_t*)compress_buffer; 280 /* Parent thread died? */
202 strm.avail_in = r - compress_buffer; 281 if (!g_opentracker_running) {
203 zres = deflate( &strm, Z_NO_FLUSH ); 282 deflateEnd(&strm);
204 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) 283 return;
205 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); 284 }
206 r = (char*)strm.next_out; 285 }
286
287 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
288 strm.next_in = (uint8_t *)"ee";
289 strm.avail_in = strlen("ee");
290 }
291
292 if (deflate(&strm, Z_FINISH) < Z_OK)
293 fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n");
294
295 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
296 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
297 free(iovector.iov_base);
298 deflateEnd(&strm);
299 return;
300 }
301
302 /* Check if there's a last batch of data in the zlib buffer */
303 if (!strm.avail_out) {
304 /* Allocate a fresh output buffer */
305 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
306
307 if (!iovector.iov_base) {
308 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
309 deflateEnd(&strm);
310 return;
207 } 311 }
312 strm.next_out = iovector.iov_base;
313 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
314 if (deflate(&strm, Z_FINISH) < Z_OK)
315 fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n");
316
317 /* Only pass the new buffer if there actually was some data left in the buffer */
318 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
319 if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
320 free(iovector.iov_base);
321 }
322
323 deflateEnd(&strm);
324}
325/* WANT_COMPRESSION_GZIP */
208#endif 326#endif
209 327
210 /* Check if there still is enough buffer left */ 328#ifdef WANT_COMPRESSION_ZSTD
211 while( r >= re )
212 if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_NO_FLUSH ) ) )
213 return mutex_bucket_unlock( bucket, 0 );
214 329
215 IF_COMPRESSION( r = compress_buffer; ) 330static void fullscrape_make_zstd(int taskid, ot_tasktype mode) {
331 int bucket;
332 char *r;
333 struct iovec iovector = {NULL, 0};
334 ZSTD_CCtx *zstream = ZSTD_createCCtx();
335 ZSTD_inBuffer inbuf;
336 ZSTD_outBuffer outbuf;
337 size_t more_bytes;
338
339 if (!zstream)
340 return;
341
342 /* Setup return vector... */
343 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
344 if (!iovector.iov_base) {
345 ZSTD_freeCCtx(zstream);
346 return;
347 }
348
349 /* Working with a compression level 6 is half as fast as level 3, but
350 seems to be the last reasonable bump that's worth extra cpu */
351 ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6);
352
353 outbuf.dst = iovector.iov_base;
354 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
355 outbuf.pos = 0;
356
357 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
358 inbuf.src = (const void *)"d5:filesd";
359 inbuf.size = strlen("d5:filesd");
360 inbuf.pos = 0;
361 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
362 }
363
364 /* For each bucket... */
365 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
366 /* Get exclusive access to that bucket */
367 ot_vector *torrents_list = mutex_bucket_lock(bucket);
368 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
369 size_t i;
370
371 /* For each torrent in this bucket.. */
372 for (i = 0; i < torrents_list->size; ++i) {
373 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
374 r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
375 inbuf.src = compress_buffer;
376 inbuf.size = r - compress_buffer;
377 inbuf.pos = 0;
378 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
379
380 /* Check if there still is enough buffer left */
381 while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) {
382 iovector.iov_len = outbuf.size;
383
384 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
385 free(iovector.iov_base);
386 ZSTD_freeCCtx(zstream);
387 return mutex_bucket_unlock(bucket, 0);
388 }
389 /* Allocate a fresh output buffer */
390 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
391 if (!iovector.iov_base) {
392 fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
393 ZSTD_freeCCtx(zstream);
394 return mutex_bucket_unlock(bucket, 0);
395 }
396
397 outbuf.dst = iovector.iov_base;
398 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
399 outbuf.pos = 0;
400
401 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
402 }
216 } 403 }
217 404
218 /* All torrents done: release lock on current bucket */ 405 /* All torrents done: release lock on current bucket */
219 mutex_bucket_unlock( bucket, 0 ); 406 mutex_bucket_unlock(bucket, 0);
220 407
221 /* Parent thread died? */ 408 /* Parent thread died? */
222 if( !g_opentracker_running ) 409 if (!g_opentracker_running)
223 return; 410 return;
224 } 411 }
225 412
226 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) 413 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
227 r += sprintf( r, "ee" ); 414 inbuf.src = (const void *)"ee";
415 inbuf.size = strlen("ee");
416 inbuf.pos = 0;
417 }
418
419 more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
228 420
229#ifdef WANT_COMPRESSION_GZIP 421 iovector.iov_len = outbuf.pos;
230 if( mode & TASK_FLAG_GZIP ) { 422 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
231 strm.next_in = (uint8_t*)compress_buffer; 423 free(iovector.iov_base);
232 strm.avail_in = r - compress_buffer; 424 ZSTD_freeCCtx(zstream);
233 if( deflate( &strm, Z_FINISH ) < Z_OK ) 425 return;
234 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" );
235 r = (char*)strm.next_out;
236
237 while( r >= re )
238 if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_FINISH ) ) )
239 return mutex_bucket_unlock( bucket, 0 );
240 deflateEnd(&strm);
241 } 426 }
242#endif
243 427
244 /* Release unused memory in current output buffer */ 428 /* Check if there's a last batch of data in the zlib buffer */
245 iovec_fixlast( iovec_entries, iovector, r ); 429 if (more_bytes) {
430 /* Allocate a fresh output buffer */
431 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
432
433 if (!iovector.iov_base) {
434 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
435 ZSTD_freeCCtx(zstream);
436 return;
437 }
438
439 outbuf.dst = iovector.iov_base;
440 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
441 outbuf.pos = 0;
442
443 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
444
445 /* Only pass the new buffer if there actually was some data left in the buffer */
446 iovector.iov_len = outbuf.pos;
447 if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
448 free(iovector.iov_base);
449 }
450
451 ZSTD_freeCCtx(zstream);
246} 452}
453/* WANT_COMPRESSION_ZSTD */
247#endif 454#endif
248 455
249const char *g_version_fullscrape_c = "$Source$: $Revision$\n"; 456/* WANT_FULLSCRAPE */
457#endif