summaryrefslogtreecommitdiff
path: root/ot_fullscrape.c
diff options
context:
space:
mode:
Diffstat (limited to 'ot_fullscrape.c')
-rw-r--r--ot_fullscrape.c470
1 files changed, 322 insertions, 148 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c
index 5d115dc..6fd6d1c 100644
--- a/ot_fullscrape.c
+++ b/ot_fullscrape.c
@@ -6,14 +6,18 @@
6#ifdef WANT_FULLSCRAPE 6#ifdef WANT_FULLSCRAPE
7 7
8/* System */ 8/* System */
9#include <sys/param.h> 9#include <arpa/inet.h>
10#include <pthread.h>
10#include <stdio.h> 11#include <stdio.h>
11#include <string.h> 12#include <string.h>
12#include <pthread.h> 13#include <sys/param.h>
13#include <arpa/inet.h>
14#ifdef WANT_COMPRESSION_GZIP 14#ifdef WANT_COMPRESSION_GZIP
15#include <zlib.h> 15#include <zlib.h>
16#endif 16#endif
17#ifdef WANT_COMPRESSION_ZSTD
18#include <zstd.h>
19#endif
20
17 21
18/* Libowfat */ 22/* Libowfat */
19#include "byte.h" 23#include "byte.h"
@@ -21,50 +25,64 @@
21#include "textcode.h" 25#include "textcode.h"
22 26
23/* Opentracker */ 27/* Opentracker */
24#include "trackerlogic.h"
25#include "ot_mutex.h"
26#include "ot_iovec.h"
27#include "ot_fullscrape.h" 28#include "ot_fullscrape.h"
29#include "ot_iovec.h"
30#include "ot_mutex.h"
31#include "trackerlogic.h"
28 32
29/* Fetch full scrape info for all torrents 33/* Fetch full scrape info for all torrents
30 Full scrapes usually are huge and one does not want to 34 Full scrapes usually are huge and one does not want to
31 allocate more memory. So lets get them in 512k units 35 allocate more memory. So lets get them in 512k units
32*/ 36*/
33#define OT_SCRAPE_CHUNK_SIZE (1024*1024) 37#define OT_SCRAPE_CHUNK_SIZE (1024 * 1024)
34 38
35/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ 39/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
36#define OT_SCRAPE_MAXENTRYLEN 256 40#define OT_SCRAPE_MAXENTRYLEN 256
37 41
38/* Forward declaration */ 42/* Forward declaration */
39static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); 43static void fullscrape_make(int taskid, ot_tasktype mode);
40#ifdef WANT_COMPRESSION_GZIP 44#ifdef WANT_COMPRESSION_GZIP
41static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); 45static void fullscrape_make_gzip(int taskid, ot_tasktype mode);
46#endif
47#ifdef WANT_COMPRESSION_ZSTD
48static void fullscrape_make_zstd(int taskid, ot_tasktype mode);
42#endif 49#endif
43 50
44/* Converter function from memory to human readable hex strings 51/* Converter function from memory to human readable hex strings
45 XXX - Duplicated from ot_stats. Needs fix. */ 52 XXX - Duplicated from ot_stats. Needs fix. */
46static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;} 53static char *to_hex(char *d, uint8_t *s) {
54 char *m = "0123456789ABCDEF";
55 char *t = d;
56 char *e = d + 40;
57 while (d < e) {
58 *d++ = m[*s >> 4];
59 *d++ = m[*s++ & 15];
60 }
61 *d = 0;
62 return t;
63}
47 64
48/* This is the entry point into this worker thread 65/* This is the entry point into this worker thread
49 It grabs tasks from mutex_tasklist and delivers results back 66 It grabs tasks from mutex_tasklist and delivers results back
50*/ 67*/
51static void * fullscrape_worker( void * args ) { 68static void *fullscrape_worker(void *args) {
52 int iovec_entries; 69 (void)args;
53 struct iovec *iovector;
54 70
55 (void) args; 71 while (g_opentracker_running) {
56
57 while( g_opentracker_running ) {
58 ot_tasktype tasktype = TASK_FULLSCRAPE; 72 ot_tasktype tasktype = TASK_FULLSCRAPE;
59 ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); 73 ot_taskid taskid = mutex_workqueue_poptask(&tasktype);
74#ifdef WANT_COMPRESSION_ZSTD
75 if (tasktype & TASK_FLAG_ZSTD)
76 fullscrape_make_zstd(taskid, tasktype);
77 else
78#endif
60#ifdef WANT_COMPRESSION_GZIP 79#ifdef WANT_COMPRESSION_GZIP
61 if (tasktype & TASK_FLAG_GZIP) 80 if (tasktype & TASK_FLAG_GZIP)
62 fullscrape_make_gzip( &iovec_entries, &iovector, tasktype ); 81 fullscrape_make_gzip(taskid, tasktype);
63 else 82 else
64#endif 83#endif
65 fullscrape_make( &iovec_entries, &iovector, tasktype ); 84 fullscrape_make(taskid, tasktype);
66 if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) 85 mutex_workqueue_pushchunked(taskid, NULL);
67 iovec_free( &iovec_entries, &iovector );
68 } 86 }
69 return NULL; 87 return NULL;
70} 88}
@@ -82,76 +100,92 @@ void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) {
82 mutex_workqueue_pushtask( sock, tasktype ); 100 mutex_workqueue_pushtask( sock, tasktype );
83} 101}
84 102
85static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_peerlist *peer_list, ot_hash *hash ) { 103static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torrent, ot_hash *hash ) {
86 switch( mode & TASK_TASK_MASK ) { 104 size_t seed_count = torrent->peer_list6->seed_count + torrent->peer_list4->seed_count;
87 case TASK_FULLSCRAPE: 105 size_t peer_count = torrent->peer_list6->peer_count + torrent->peer_list4->peer_count;
88 default: 106 size_t down_count = torrent->peer_list6->down_count + torrent->peer_list4->down_count;
89 /* push hash as bencoded string */ 107
90 *r++='2'; *r++='0'; *r++=':'; 108 switch (mode & TASK_TASK_MASK) {
91 memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash); 109 case TASK_FULLSCRAPE:
92 /* push rest of the scrape string */ 110 default:
93 r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); 111 /* push hash as bencoded string */
94 112 *r++ = '2';
95 break; 113 *r++ = '0';
96 case TASK_FULLSCRAPE_TPB_ASCII: 114 *r++ = ':';
97 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); 115 memcpy(r, hash, sizeof(ot_hash));
98 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); 116 r += sizeof(ot_hash);
99 break; 117 /* push rest of the scrape string */
100 case TASK_FULLSCRAPE_TPB_ASCII_PLUS: 118 r += sprintf(r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", seed_count, down_count, peer_count - seed_count);
101 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); 119
102 r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count ); 120 break;
103 break; 121 case TASK_FULLSCRAPE_TPB_ASCII:
104 case TASK_FULLSCRAPE_TPB_BINARY: 122 to_hex(r, *hash);
105 memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash); 123 r += 2 * sizeof(ot_hash);
106 *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count ); 124 r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count);
107 *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) ); 125 break;
108 r+=8; 126 case TASK_FULLSCRAPE_TPB_ASCII_PLUS:
109 break; 127 to_hex(r, *hash);
110 case TASK_FULLSCRAPE_TPB_URLENCODED: 128 r += 2 * sizeof(ot_hash);
111 r += fmt_urlencoded( r, (char *)*hash, 20 ); 129 r += sprintf(r, ":%zd:%zd:%zd\n", seed_count, peer_count - seed_count, down_count);
112 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); 130 break;
113 break; 131 case TASK_FULLSCRAPE_TPB_BINARY:
114 case TASK_FULLSCRAPE_TRACKERSTATE: 132 memcpy(r, *hash, sizeof(ot_hash));
115 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); 133 r += sizeof(ot_hash);
116 r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count ); 134 *(uint32_t *)(r + 0) = htonl((uint32_t)seed_count);
117 break; 135 *(uint32_t *)(r + 4) = htonl((uint32_t)(peer_count - seed_count));
118 } 136 r += 8;
119 return r; 137 break;
138 case TASK_FULLSCRAPE_TPB_URLENCODED:
139 r += fmt_urlencoded(r, (char *)*hash, 20);
140 r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count);
141 break;
142 case TASK_FULLSCRAPE_TRACKERSTATE:
143 to_hex(r, *hash);
144 r += 2 * sizeof(ot_hash);
145 r += sprintf(r, ":%zd:%zd\n", torrent->peer_list6->base, down_count);
146 break;
147 }
148 return r;
120} 149}
121 150
122static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { 151static void fullscrape_make(int taskid, ot_tasktype mode) {
123 int bucket; 152 int bucket;
124 char *r, *re; 153 char *r, *re;
154 struct iovec iovector = {NULL, 0};
125 155
126 /* Setup return vector... */ 156 /* Setup return vector... */
127 *iovec_entries = 0; 157 r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
128 *iovector = NULL; 158 if (!r)
129 r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE );
130 if( !r )
131 return; 159 return;
132 160
133 /* re points to low watermark */ 161 /* re points to low watermark */
134 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; 162 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
135 163
136 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) 164 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE)
137 r += sprintf( r, "d5:filesd" ); 165 r += sprintf(r, "d5:filesd");
138 166
139 /* For each bucket... */ 167 /* For each bucket... */
140 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { 168 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
141 /* Get exclusive access to that bucket */ 169 /* Get exclusive access to that bucket */
142 ot_vector *torrents_list = mutex_bucket_lock( bucket ); 170 ot_vector *torrents_list = mutex_bucket_lock(bucket);
143 ot_torrent *torrents = (ot_torrent*)(torrents_list->data); 171 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
144 size_t i; 172 size_t i;
145 173
146 /* For each torrent in this bucket.. */ 174 /* For each torrent in this bucket.. */
147 for( i=0; i<torrents_list->size; ++i ) { 175 for (i = 0; i < torrents_list->size; ++i) {
148 r = fullscrape_write_one( mode, r, torrents[i].peer_list, &torrents[i].hash ); 176 r = fullscrape_write_one(mode, r, torrents + i, &torrents[i].hash);
149 177
150 if( r > re) { 178 if (r > re) {
151 /* Allocate a fresh output buffer at the end of our buffers list */ 179 iovector.iov_len = r - (char *)iovector.iov_base;
152 r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SCRAPE_CHUNK_SIZE ); 180
153 if( !r ) 181 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
154 return mutex_bucket_unlock( bucket, 0 ); 182 free(iovector.iov_base);
183 return mutex_bucket_unlock(bucket, 0);
184 }
185 /* Allocate a fresh output buffer */
186 r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
187 if (!r)
188 return mutex_bucket_unlock(bucket, 0);
155 189
156 /* re points to low watermark */ 190 /* re points to low watermark */
157 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; 191 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
@@ -159,125 +193,265 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
159 } 193 }
160 194
161 /* All torrents done: release lock on current bucket */ 195 /* All torrents done: release lock on current bucket */
162 mutex_bucket_unlock( bucket, 0 ); 196 mutex_bucket_unlock(bucket, 0);
163 197
164 /* Parent thread died? */ 198 /* Parent thread died? */
165 if( !g_opentracker_running ) 199 if (!g_opentracker_running)
166 return; 200 return;
167 } 201 }
168 202
169 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) 203 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE)
170 r += sprintf( r, "ee" ); 204 r += sprintf(r, "ee");
171 205
172 /* Release unused memory in current output buffer */ 206 /* Send rest of data */
173 iovec_fixlast( iovec_entries, iovector, r ); 207 iovector.iov_len = r - (char *)iovector.iov_base;
208 if (mutex_workqueue_pushchunked(taskid, &iovector))
209 free(iovector.iov_base);
174} 210}
175 211
176#ifdef WANT_COMPRESSION_GZIP 212#ifdef WANT_COMPRESSION_GZIP
177 213
178static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { 214static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
179 int bucket; 215 int bucket;
180 char *r; 216 char *r;
181 int zres; 217 struct iovec iovector = {NULL, 0};
182 z_stream strm; 218 int zres;
183 219 z_stream strm;
184 /* Setup return vector... */ 220 /* Setup return vector... */
185 *iovec_entries = 0; 221 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
186 *iovector = NULL; 222 if (!iovector.iov_base)
187 r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE );
188 if( !r )
189 return; 223 return;
190 224
191 byte_zero( &strm, sizeof(strm) ); 225 byte_zero(&strm, sizeof(strm));
192 strm.next_out = (uint8_t*)r; 226 strm.next_out = (uint8_t *)iovector.iov_base;
193 strm.avail_out = OT_SCRAPE_CHUNK_SIZE; 227 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
194 if( deflateInit2(&strm,7,Z_DEFLATED,31,9,Z_DEFAULT_STRATEGY) != Z_OK ) 228 if (deflateInit2(&strm, 7, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY) != Z_OK)
195 fprintf( stderr, "not ok.\n" ); 229 fprintf(stderr, "not ok.\n");
196 230
197 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { 231 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
198 strm.next_in = (uint8_t*)"d5:filesd"; 232 strm.next_in = (uint8_t *)"d5:filesd";
199 strm.avail_in = strlen("d5:filesd"); 233 strm.avail_in = strlen("d5:filesd");
200 zres = deflate( &strm, Z_NO_FLUSH ); 234 zres = deflate(&strm, Z_NO_FLUSH);
201 } 235 }
202 236
203 /* For each bucket... */ 237 /* For each bucket... */
204 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { 238 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
205 /* Get exclusive access to that bucket */ 239 /* Get exclusive access to that bucket */
206 ot_vector *torrents_list = mutex_bucket_lock( bucket ); 240 ot_vector *torrents_list = mutex_bucket_lock(bucket);
207 ot_torrent *torrents = (ot_torrent*)(torrents_list->data); 241 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
208 size_t i; 242 size_t i;
209 243
210 /* For each torrent in this bucket.. */ 244 /* For each torrent in this bucket.. */
211 for( i=0; i<torrents_list->size; ++i ) { 245 for (i = 0; i < torrents_list->size; ++i) {
212 char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; 246 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
213 r = fullscrape_write_one( mode, compress_buffer, torrents[i].peer_list, &torrents[i].hash ); 247 r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
214 strm.next_in = (uint8_t*)compress_buffer; 248 strm.next_in = (uint8_t *)compress_buffer;
215 strm.avail_in = r - compress_buffer; 249 strm.avail_in = r - compress_buffer;
216 zres = deflate( &strm, Z_NO_FLUSH ); 250 zres = deflate(&strm, Z_NO_FLUSH);
217 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) 251 if ((zres < Z_OK) && (zres != Z_BUF_ERROR))
218 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); 252 fprintf(stderr, "deflate() failed while in fullscrape_make().\n");
219 253
220 /* Check if there still is enough buffer left */ 254 /* Check if there still is enough buffer left */
221 while( !strm.avail_out ) { 255 while (!strm.avail_out) {
222 /* Allocate a fresh output buffer at the end of our buffers list */ 256 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
223 r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ); 257
224 if( !r ) { 258 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
225 fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" ); 259 free(iovector.iov_base);
226 iovec_free( iovec_entries, iovector ); 260 return mutex_bucket_unlock(bucket, 0);
261 }
262 /* Allocate a fresh output buffer */
263 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
264 if (!iovector.iov_base) {
265 fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
227 deflateEnd(&strm); 266 deflateEnd(&strm);
228 return mutex_bucket_unlock( bucket, 0 ); 267 return mutex_bucket_unlock(bucket, 0);
229 } 268 }
230 strm.next_out = (uint8_t*)r; 269 strm.next_out = (uint8_t *)iovector.iov_base;
231 strm.avail_out = OT_SCRAPE_CHUNK_SIZE; 270 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
232 zres = deflate( &strm, Z_NO_FLUSH ); 271 zres = deflate(&strm, Z_NO_FLUSH);
233 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) 272 if ((zres < Z_OK) && (zres != Z_BUF_ERROR))
234 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); 273 fprintf(stderr, "deflate() failed while in fullscrape_make().\n");
235 } 274 }
236 } 275 }
237 276
238 /* All torrents done: release lock on current bucket */ 277 /* All torrents done: release lock on current bucket */
239 mutex_bucket_unlock( bucket, 0 ); 278 mutex_bucket_unlock(bucket, 0);
240 279
241 /* Parent thread died? */ 280 /* Parent thread died? */
242 if( !g_opentracker_running ) 281 if (!g_opentracker_running) {
282 deflateEnd(&strm);
243 return; 283 return;
284 }
244 } 285 }
245 286
246 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { 287 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
247 strm.next_in = (uint8_t*)"ee"; 288 strm.next_in = (uint8_t *)"ee";
248 strm.avail_in = strlen("ee"); 289 strm.avail_in = strlen("ee");
249 } 290 }
250 291
251 if( deflate( &strm, Z_FINISH ) < Z_OK ) 292 if (deflate(&strm, Z_FINISH) < Z_OK)
252 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); 293 fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n");
253
254 if( !strm.avail_out ) {
255 unsigned int pending;
256 int bits;
257 deflatePending( &strm, &pending, &bits);
258 pending += ( bits ? 1 : 0 );
259 294
260 /* Allocate a fresh output buffer at the end of our buffers list */ 295 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
261 r = iovec_fix_increase_or_free( iovec_entries, iovector, strm.next_out, pending ); 296 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
262 if( !r ) { 297 free(iovector.iov_base);
263 fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" ); 298 deflateEnd(&strm);
264 deflateEnd(&strm); 299 return;
265 return mutex_bucket_unlock( bucket, 0 );
266 }
267 strm.next_out = (uint8_t*)r;
268 strm.avail_out = pending;
269 if( deflate( &strm, Z_FINISH ) < Z_OK )
270 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" );
271 } 300 }
272 301
273 /* Release unused memory in current output buffer */ 302 /* Check if there's a last batch of data in the zlib buffer */
274 iovec_fixlast( iovec_entries, iovector, strm.next_out ); 303 if (!strm.avail_out) {
304 /* Allocate a fresh output buffer */
305 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
306
307 if (!iovector.iov_base) {
308 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
309 deflateEnd(&strm);
310 return;
311 }
312 strm.next_out = iovector.iov_base;
313 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
314 if (deflate(&strm, Z_FINISH) < Z_OK)
315 fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n");
316
317 /* Only pass the new buffer if there actually was some data left in the buffer */
318 iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
319 if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
320 free(iovector.iov_base);
321 }
275 322
276 deflateEnd(&strm); 323 deflateEnd(&strm);
277} 324}
278/* WANT_COMPRESSION_GZIP */ 325/* WANT_COMPRESSION_GZIP */
279#endif 326#endif
280 327
328#ifdef WANT_COMPRESSION_ZSTD
329
330static void fullscrape_make_zstd(int taskid, ot_tasktype mode) {
331 int bucket;
332 char *r;
333 struct iovec iovector = {NULL, 0};
334 ZSTD_CCtx *zstream = ZSTD_createCCtx();
335 ZSTD_inBuffer inbuf;
336 ZSTD_outBuffer outbuf;
337 size_t more_bytes;
338
339 if (!zstream)
340 return;
341
342 /* Setup return vector... */
343 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
344 if (!iovector.iov_base) {
345 ZSTD_freeCCtx(zstream);
346 return;
347 }
348
349 /* Working with a compression level 6 is half as fast as level 3, but
350 seems to be the last reasonable bump that's worth extra cpu */
351 ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6);
352
353 outbuf.dst = iovector.iov_base;
354 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
355 outbuf.pos = 0;
356
357 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
358 inbuf.src = (const void *)"d5:filesd";
359 inbuf.size = strlen("d5:filesd");
360 inbuf.pos = 0;
361 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
362 }
363
364 /* For each bucket... */
365 for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
366 /* Get exclusive access to that bucket */
367 ot_vector *torrents_list = mutex_bucket_lock(bucket);
368 ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
369 size_t i;
370
371 /* For each torrent in this bucket.. */
372 for (i = 0; i < torrents_list->size; ++i) {
373 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
374 r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
375 inbuf.src = compress_buffer;
376 inbuf.size = r - compress_buffer;
377 inbuf.pos = 0;
378 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
379
380 /* Check if there still is enough buffer left */
381 while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) {
382 iovector.iov_len = outbuf.size;
383
384 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
385 free(iovector.iov_base);
386 ZSTD_freeCCtx(zstream);
387 return mutex_bucket_unlock(bucket, 0);
388 }
389 /* Allocate a fresh output buffer */
390 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
391 if (!iovector.iov_base) {
392 fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
393 ZSTD_freeCCtx(zstream);
394 return mutex_bucket_unlock(bucket, 0);
395 }
396
397 outbuf.dst = iovector.iov_base;
398 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
399 outbuf.pos = 0;
400
401 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
402 }
403 }
404
405 /* All torrents done: release lock on current bucket */
406 mutex_bucket_unlock(bucket, 0);
407
408 /* Parent thread died? */
409 if (!g_opentracker_running)
410 return;
411 }
412
413 if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
414 inbuf.src = (const void *)"ee";
415 inbuf.size = strlen("ee");
416 inbuf.pos = 0;
417 }
418
419 more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
420
421 iovector.iov_len = outbuf.pos;
422 if (mutex_workqueue_pushchunked(taskid, &iovector)) {
423 free(iovector.iov_base);
424 ZSTD_freeCCtx(zstream);
425 return;
426 }
427
428 /* Check if there's a last batch of data in the zlib buffer */
429 if (more_bytes) {
430 /* Allocate a fresh output buffer */
431 iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
432
433 if (!iovector.iov_base) {
434 fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
435 ZSTD_freeCCtx(zstream);
436 return;
437 }
438
439 outbuf.dst = iovector.iov_base;
440 outbuf.size = OT_SCRAPE_CHUNK_SIZE;
441 outbuf.pos = 0;
442
443 ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
444
445 /* Only pass the new buffer if there actually was some data left in the buffer */
446 iovector.iov_len = outbuf.pos;
447 if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
448 free(iovector.iov_base);
449 }
450
451 ZSTD_freeCCtx(zstream);
452}
453/* WANT_COMPRESSION_ZSTD */
454#endif
455
281/* WANT_FULLSCRAPE */ 456/* WANT_FULLSCRAPE */
282#endif 457#endif
283const char *g_version_fullscrape_c = "$Source$: $Revision$\n";