diff options
Diffstat (limited to 'ot_fullscrape.c')
-rw-r--r-- | ot_fullscrape.c | 510 |
1 files changed, 359 insertions, 151 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c index faea4b9..6fd6d1c 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c | |||
@@ -6,14 +6,18 @@ | |||
6 | #ifdef WANT_FULLSCRAPE | 6 | #ifdef WANT_FULLSCRAPE |
7 | 7 | ||
8 | /* System */ | 8 | /* System */ |
9 | #include <sys/param.h> | 9 | #include <arpa/inet.h> |
10 | #include <pthread.h> | ||
10 | #include <stdio.h> | 11 | #include <stdio.h> |
11 | #include <string.h> | 12 | #include <string.h> |
12 | #include <pthread.h> | 13 | #include <sys/param.h> |
13 | #include <arpa/inet.h> | ||
14 | #ifdef WANT_COMPRESSION_GZIP | 14 | #ifdef WANT_COMPRESSION_GZIP |
15 | #include <zlib.h> | 15 | #include <zlib.h> |
16 | #endif | 16 | #endif |
17 | #ifdef WANT_COMPRESSION_ZSTD | ||
18 | #include <zstd.h> | ||
19 | #endif | ||
20 | |||
17 | 21 | ||
18 | /* Libowfat */ | 22 | /* Libowfat */ |
19 | #include "byte.h" | 23 | #include "byte.h" |
@@ -21,52 +25,64 @@ | |||
21 | #include "textcode.h" | 25 | #include "textcode.h" |
22 | 26 | ||
23 | /* Opentracker */ | 27 | /* Opentracker */ |
24 | #include "trackerlogic.h" | ||
25 | #include "ot_mutex.h" | ||
26 | #include "ot_iovec.h" | ||
27 | #include "ot_fullscrape.h" | 28 | #include "ot_fullscrape.h" |
29 | #include "ot_iovec.h" | ||
30 | #include "ot_mutex.h" | ||
31 | #include "trackerlogic.h" | ||
28 | 32 | ||
29 | /* Fetch full scrape info for all torrents | 33 | /* Fetch full scrape info for all torrents |
30 | Full scrapes usually are huge and one does not want to | 34 | Full scrapes usually are huge and one does not want to |
31 | allocate more memory. So lets get them in 512k units | 35 | allocate more memory. So lets get them in 512k units |
32 | */ | 36 | */ |
33 | #define OT_SCRAPE_CHUNK_SIZE (512*1024) | 37 | #define OT_SCRAPE_CHUNK_SIZE (1024 * 1024) |
34 | 38 | ||
35 | /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ | 39 | /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ |
36 | #define OT_SCRAPE_MAXENTRYLEN 256 | 40 | #define OT_SCRAPE_MAXENTRYLEN 256 |
37 | 41 | ||
42 | /* Forward declaration */ | ||
43 | static void fullscrape_make(int taskid, ot_tasktype mode); | ||
38 | #ifdef WANT_COMPRESSION_GZIP | 44 | #ifdef WANT_COMPRESSION_GZIP |
39 | #define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK | 45 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode); |
40 | #define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 ) , param1, param2, param3 | 46 | #endif |
41 | #else | 47 | #ifdef WANT_COMPRESSION_ZSTD |
42 | #define IF_COMPRESSION( TASK ) | 48 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode); |
43 | #define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 ) | ||
44 | #endif | 49 | #endif |
45 | |||
46 | /* Forward declaration */ | ||
47 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); | ||
48 | 50 | ||
49 | /* Converter function from memory to human readable hex strings | 51 | /* Converter function from memory to human readable hex strings |
50 | XXX - Duplicated from ot_stats. Needs fix. */ | 52 | XXX - Duplicated from ot_stats. Needs fix. */ |
51 | static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;} | 53 | static char *to_hex(char *d, uint8_t *s) { |
54 | char *m = "0123456789ABCDEF"; | ||
55 | char *t = d; | ||
56 | char *e = d + 40; | ||
57 | while (d < e) { | ||
58 | *d++ = m[*s >> 4]; | ||
59 | *d++ = m[*s++ & 15]; | ||
60 | } | ||
61 | *d = 0; | ||
62 | return t; | ||
63 | } | ||
52 | 64 | ||
53 | /* This is the entry point into this worker thread | 65 | /* This is the entry point into this worker thread |
54 | It grabs tasks from mutex_tasklist and delivers results back | 66 | It grabs tasks from mutex_tasklist and delivers results back |
55 | */ | 67 | */ |
56 | static void * fullscrape_worker( void * args ) { | 68 | static void *fullscrape_worker(void *args) { |
57 | int iovec_entries; | 69 | (void)args; |
58 | struct iovec *iovector; | ||
59 | |||
60 | (void) args; | ||
61 | 70 | ||
62 | while( 1 ) { | 71 | while (g_opentracker_running) { |
63 | ot_tasktype tasktype = TASK_FULLSCRAPE; | 72 | ot_tasktype tasktype = TASK_FULLSCRAPE; |
64 | ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); | 73 | ot_taskid taskid = mutex_workqueue_poptask(&tasktype); |
65 | fullscrape_make( &iovec_entries, &iovector, tasktype ); | 74 | #ifdef WANT_COMPRESSION_ZSTD |
66 | if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) | 75 | if (tasktype & TASK_FLAG_ZSTD) |
67 | iovec_free( &iovec_entries, &iovector ); | 76 | fullscrape_make_zstd(taskid, tasktype); |
68 | if( !g_opentracker_running ) | 77 | else |
69 | return NULL; | 78 | #endif |
79 | #ifdef WANT_COMPRESSION_GZIP | ||
80 | if (tasktype & TASK_FLAG_GZIP) | ||
81 | fullscrape_make_gzip(taskid, tasktype); | ||
82 | else | ||
83 | #endif | ||
84 | fullscrape_make(taskid, tasktype); | ||
85 | mutex_workqueue_pushchunked(taskid, NULL); | ||
70 | } | 86 | } |
71 | return NULL; | 87 | return NULL; |
72 | } | 88 | } |
@@ -84,166 +100,358 @@ void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) { | |||
84 | mutex_workqueue_pushtask( sock, tasktype ); | 100 | mutex_workqueue_pushtask( sock, tasktype ); |
85 | } | 101 | } |
86 | 102 | ||
87 | static int fullscrape_increase( int *iovec_entries, struct iovec **iovector, | 103 | static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torrent, ot_hash *hash ) { |
88 | char **r, char **re WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode, int zaction ) ) { | 104 | size_t seed_count = torrent->peer_list6->seed_count + torrent->peer_list4->seed_count; |
89 | /* Allocate a fresh output buffer at the end of our buffers list */ | 105 | size_t peer_count = torrent->peer_list6->peer_count + torrent->peer_list4->peer_count; |
90 | if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) { | 106 | size_t down_count = torrent->peer_list6->down_count + torrent->peer_list4->down_count; |
107 | |||
108 | switch (mode & TASK_TASK_MASK) { | ||
109 | case TASK_FULLSCRAPE: | ||
110 | default: | ||
111 | /* push hash as bencoded string */ | ||
112 | *r++ = '2'; | ||
113 | *r++ = '0'; | ||
114 | *r++ = ':'; | ||
115 | memcpy(r, hash, sizeof(ot_hash)); | ||
116 | r += sizeof(ot_hash); | ||
117 | /* push rest of the scrape string */ | ||
118 | r += sprintf(r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", seed_count, down_count, peer_count - seed_count); | ||
119 | |||
120 | break; | ||
121 | case TASK_FULLSCRAPE_TPB_ASCII: | ||
122 | to_hex(r, *hash); | ||
123 | r += 2 * sizeof(ot_hash); | ||
124 | r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count); | ||
125 | break; | ||
126 | case TASK_FULLSCRAPE_TPB_ASCII_PLUS: | ||
127 | to_hex(r, *hash); | ||
128 | r += 2 * sizeof(ot_hash); | ||
129 | r += sprintf(r, ":%zd:%zd:%zd\n", seed_count, peer_count - seed_count, down_count); | ||
130 | break; | ||
131 | case TASK_FULLSCRAPE_TPB_BINARY: | ||
132 | memcpy(r, *hash, sizeof(ot_hash)); | ||
133 | r += sizeof(ot_hash); | ||
134 | *(uint32_t *)(r + 0) = htonl((uint32_t)seed_count); | ||
135 | *(uint32_t *)(r + 4) = htonl((uint32_t)(peer_count - seed_count)); | ||
136 | r += 8; | ||
137 | break; | ||
138 | case TASK_FULLSCRAPE_TPB_URLENCODED: | ||
139 | r += fmt_urlencoded(r, (char *)*hash, 20); | ||
140 | r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count); | ||
141 | break; | ||
142 | case TASK_FULLSCRAPE_TRACKERSTATE: | ||
143 | to_hex(r, *hash); | ||
144 | r += 2 * sizeof(ot_hash); | ||
145 | r += sprintf(r, ":%zd:%zd\n", torrent->peer_list6->base, down_count); | ||
146 | break; | ||
147 | } | ||
148 | return r; | ||
149 | } | ||
91 | 150 | ||
92 | /* Deallocate gzip buffers */ | 151 | static void fullscrape_make(int taskid, ot_tasktype mode) { |
93 | IF_COMPRESSION( deflateEnd(strm); ) | 152 | int bucket; |
153 | char *r, *re; | ||
154 | struct iovec iovector = {NULL, 0}; | ||
94 | 155 | ||
95 | /* Release lock on current bucket and return */ | 156 | /* Setup return vector... */ |
96 | return -1; | 157 | r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); |
97 | } | 158 | if (!r) |
159 | return; | ||
98 | 160 | ||
99 | /* Adjust new end of output buffer */ | 161 | /* re points to low watermark */ |
100 | *re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; | 162 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; |
101 | 163 | ||
102 | /* When compressing, we have all the bytes in output buffer */ | 164 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) |
103 | #ifdef WANT_COMPRESSION_GZIP | 165 | r += sprintf(r, "d5:filesd"); |
104 | if( mode & TASK_FLAG_GZIP ) { | 166 | |
105 | int zres; | 167 | /* For each bucket... */ |
106 | *re -= OT_SCRAPE_MAXENTRYLEN; | 168 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { |
107 | strm->next_out = (uint8_t*)*r; | 169 | /* Get exclusive access to that bucket */ |
108 | strm->avail_out = OT_SCRAPE_CHUNK_SIZE; | 170 | ot_vector *torrents_list = mutex_bucket_lock(bucket); |
109 | zres = deflate( strm, zaction ); | 171 | ot_torrent *torrents = (ot_torrent *)(torrents_list->data); |
110 | if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) | 172 | size_t i; |
111 | fprintf( stderr, "deflate() failed while in fullscrape_increase(%d).\n", zaction ); | 173 | |
112 | *r = (char*)strm->next_out; | 174 | /* For each torrent in this bucket.. */ |
175 | for (i = 0; i < torrents_list->size; ++i) { | ||
176 | r = fullscrape_write_one(mode, r, torrents + i, &torrents[i].hash); | ||
177 | |||
178 | if (r > re) { | ||
179 | iovector.iov_len = r - (char *)iovector.iov_base; | ||
180 | |||
181 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
182 | free(iovector.iov_base); | ||
183 | return mutex_bucket_unlock(bucket, 0); | ||
184 | } | ||
185 | /* Allocate a fresh output buffer */ | ||
186 | r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
187 | if (!r) | ||
188 | return mutex_bucket_unlock(bucket, 0); | ||
189 | |||
190 | /* re points to low watermark */ | ||
191 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; | ||
192 | } | ||
193 | } | ||
194 | |||
195 | /* All torrents done: release lock on current bucket */ | ||
196 | mutex_bucket_unlock(bucket, 0); | ||
197 | |||
198 | /* Parent thread died? */ | ||
199 | if (!g_opentracker_running) | ||
200 | return; | ||
113 | } | 201 | } |
114 | #endif | ||
115 | 202 | ||
116 | return 0; | 203 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) |
204 | r += sprintf(r, "ee"); | ||
205 | |||
206 | /* Send rest of data */ | ||
207 | iovector.iov_len = r - (char *)iovector.iov_base; | ||
208 | if (mutex_workqueue_pushchunked(taskid, &iovector)) | ||
209 | free(iovector.iov_base); | ||
117 | } | 210 | } |
118 | 211 | ||
119 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { | ||
120 | int bucket; | ||
121 | char *r, *re; | ||
122 | #ifdef WANT_COMPRESSION_GZIP | 212 | #ifdef WANT_COMPRESSION_GZIP |
123 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; | ||
124 | z_stream strm; | ||
125 | #endif | ||
126 | 213 | ||
214 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | ||
215 | int bucket; | ||
216 | char *r; | ||
217 | struct iovec iovector = {NULL, 0}; | ||
218 | int zres; | ||
219 | z_stream strm; | ||
127 | /* Setup return vector... */ | 220 | /* Setup return vector... */ |
128 | *iovec_entries = 0; | 221 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); |
129 | *iovector = NULL; | 222 | if (!iovector.iov_base) |
130 | if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) | ||
131 | return; | 223 | return; |
132 | 224 | ||
133 | /* re points to low watermark */ | 225 | byte_zero(&strm, sizeof(strm)); |
134 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; | 226 | strm.next_out = (uint8_t *)iovector.iov_base; |
227 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | ||
228 | if (deflateInit2(&strm, 7, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY) != Z_OK) | ||
229 | fprintf(stderr, "not ok.\n"); | ||
135 | 230 | ||
136 | #ifdef WANT_COMPRESSION_GZIP | 231 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { |
137 | if( mode & TASK_FLAG_GZIP ) { | 232 | strm.next_in = (uint8_t *)"d5:filesd"; |
138 | re += OT_SCRAPE_MAXENTRYLEN; | 233 | strm.avail_in = strlen("d5:filesd"); |
139 | byte_zero( &strm, sizeof(strm) ); | 234 | zres = deflate(&strm, Z_NO_FLUSH); |
140 | strm.next_in = (uint8_t*)compress_buffer; | ||
141 | strm.next_out = (uint8_t*)r; | ||
142 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | ||
143 | if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK ) | ||
144 | fprintf( stderr, "not ok.\n" ); | ||
145 | r = compress_buffer; | ||
146 | } | 235 | } |
147 | #endif | ||
148 | |||
149 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) | ||
150 | r += sprintf( r, "d5:filesd" ); | ||
151 | 236 | ||
152 | /* For each bucket... */ | 237 | /* For each bucket... */ |
153 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | 238 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { |
154 | /* Get exclusive access to that bucket */ | 239 | /* Get exclusive access to that bucket */ |
155 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | 240 | ot_vector *torrents_list = mutex_bucket_lock(bucket); |
156 | size_t tor_offset; | 241 | ot_torrent *torrents = (ot_torrent *)(torrents_list->data); |
242 | size_t i; | ||
157 | 243 | ||
158 | /* For each torrent in this bucket.. */ | 244 | /* For each torrent in this bucket.. */ |
159 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | 245 | for (i = 0; i < torrents_list->size; ++i) { |
160 | /* Address torrents members */ | 246 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; |
161 | ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; | 247 | r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash); |
162 | ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash; | 248 | strm.next_in = (uint8_t *)compress_buffer; |
163 | 249 | strm.avail_in = r - compress_buffer; | |
164 | switch( mode & TASK_TASK_MASK ) { | 250 | zres = deflate(&strm, Z_NO_FLUSH); |
165 | case TASK_FULLSCRAPE: | 251 | if ((zres < Z_OK) && (zres != Z_BUF_ERROR)) |
166 | default: | 252 | fprintf(stderr, "deflate() failed while in fullscrape_make().\n"); |
167 | /* push hash as bencoded string */ | 253 | |
168 | *r++='2'; *r++='0'; *r++=':'; | 254 | /* Check if there still is enough buffer left */ |
169 | memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash); | 255 | while (!strm.avail_out) { |
170 | /* push rest of the scrape string */ | 256 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; |
171 | r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); | 257 | |
172 | 258 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | |
173 | break; | 259 | free(iovector.iov_base); |
174 | case TASK_FULLSCRAPE_TPB_ASCII: | 260 | return mutex_bucket_unlock(bucket, 0); |
175 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); | 261 | } |
176 | r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); | 262 | /* Allocate a fresh output buffer */ |
177 | break; | 263 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); |
178 | case TASK_FULLSCRAPE_TPB_ASCII_PLUS: | 264 | if (!iovector.iov_base) { |
179 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); | 265 | fprintf(stderr, "Out of memory trying to claim ouput buffer\n"); |
180 | r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count ); | 266 | deflateEnd(&strm); |
181 | break; | 267 | return mutex_bucket_unlock(bucket, 0); |
182 | case TASK_FULLSCRAPE_TPB_BINARY: | 268 | } |
183 | memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash); | 269 | strm.next_out = (uint8_t *)iovector.iov_base; |
184 | *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count ); | 270 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; |
185 | *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) ); | 271 | zres = deflate(&strm, Z_NO_FLUSH); |
186 | r+=8; | 272 | if ((zres < Z_OK) && (zres != Z_BUF_ERROR)) |
187 | break; | 273 | fprintf(stderr, "deflate() failed while in fullscrape_make().\n"); |
188 | case TASK_FULLSCRAPE_TPB_URLENCODED: | ||
189 | r += fmt_urlencoded( r, (char *)*hash, 20 ); | ||
190 | r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); | ||
191 | break; | ||
192 | case TASK_FULLSCRAPE_TRACKERSTATE: | ||
193 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); | ||
194 | r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count ); | ||
195 | break; | ||
196 | } | 274 | } |
275 | } | ||
197 | 276 | ||
198 | #ifdef WANT_COMPRESSION_GZIP | 277 | /* All torrents done: release lock on current bucket */ |
199 | if( mode & TASK_FLAG_GZIP ) { | 278 | mutex_bucket_unlock(bucket, 0); |
200 | int zres; | 279 | |
201 | strm.next_in = (uint8_t*)compress_buffer; | 280 | /* Parent thread died? */ |
202 | strm.avail_in = r - compress_buffer; | 281 | if (!g_opentracker_running) { |
203 | zres = deflate( &strm, Z_NO_FLUSH ); | 282 | deflateEnd(&strm); |
204 | if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) | 283 | return; |
205 | fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); | 284 | } |
206 | r = (char*)strm.next_out; | 285 | } |
286 | |||
287 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
288 | strm.next_in = (uint8_t *)"ee"; | ||
289 | strm.avail_in = strlen("ee"); | ||
290 | } | ||
291 | |||
292 | if (deflate(&strm, Z_FINISH) < Z_OK) | ||
293 | fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n"); | ||
294 | |||
295 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; | ||
296 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
297 | free(iovector.iov_base); | ||
298 | deflateEnd(&strm); | ||
299 | return; | ||
300 | } | ||
301 | |||
302 | /* Check if there's a last batch of data in the zlib buffer */ | ||
303 | if (!strm.avail_out) { | ||
304 | /* Allocate a fresh output buffer */ | ||
305 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
306 | |||
307 | if (!iovector.iov_base) { | ||
308 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | ||
309 | deflateEnd(&strm); | ||
310 | return; | ||
207 | } | 311 | } |
312 | strm.next_out = iovector.iov_base; | ||
313 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | ||
314 | if (deflate(&strm, Z_FINISH) < Z_OK) | ||
315 | fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n"); | ||
316 | |||
317 | /* Only pass the new buffer if there actually was some data left in the buffer */ | ||
318 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; | ||
319 | if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector)) | ||
320 | free(iovector.iov_base); | ||
321 | } | ||
322 | |||
323 | deflateEnd(&strm); | ||
324 | } | ||
325 | /* WANT_COMPRESSION_GZIP */ | ||
208 | #endif | 326 | #endif |
209 | 327 | ||
210 | /* Check if there still is enough buffer left */ | 328 | #ifdef WANT_COMPRESSION_ZSTD |
211 | while( r >= re ) | ||
212 | if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_NO_FLUSH ) ) ) | ||
213 | return mutex_bucket_unlock( bucket, 0 ); | ||
214 | 329 | ||
215 | IF_COMPRESSION( r = compress_buffer; ) | 330 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode) { |
331 | int bucket; | ||
332 | char *r; | ||
333 | struct iovec iovector = {NULL, 0}; | ||
334 | ZSTD_CCtx *zstream = ZSTD_createCCtx(); | ||
335 | ZSTD_inBuffer inbuf; | ||
336 | ZSTD_outBuffer outbuf; | ||
337 | size_t more_bytes; | ||
338 | |||
339 | if (!zstream) | ||
340 | return; | ||
341 | |||
342 | /* Setup return vector... */ | ||
343 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
344 | if (!iovector.iov_base) { | ||
345 | ZSTD_freeCCtx(zstream); | ||
346 | return; | ||
347 | } | ||
348 | |||
349 | /* Working with a compression level 6 is half as fast as level 3, but | ||
350 | seems to be the last reasonable bump that's worth extra cpu */ | ||
351 | ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6); | ||
352 | |||
353 | outbuf.dst = iovector.iov_base; | ||
354 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
355 | outbuf.pos = 0; | ||
356 | |||
357 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
358 | inbuf.src = (const void *)"d5:filesd"; | ||
359 | inbuf.size = strlen("d5:filesd"); | ||
360 | inbuf.pos = 0; | ||
361 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
362 | } | ||
363 | |||
364 | /* For each bucket... */ | ||
365 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { | ||
366 | /* Get exclusive access to that bucket */ | ||
367 | ot_vector *torrents_list = mutex_bucket_lock(bucket); | ||
368 | ot_torrent *torrents = (ot_torrent *)(torrents_list->data); | ||
369 | size_t i; | ||
370 | |||
371 | /* For each torrent in this bucket.. */ | ||
372 | for (i = 0; i < torrents_list->size; ++i) { | ||
373 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; | ||
374 | r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash); | ||
375 | inbuf.src = compress_buffer; | ||
376 | inbuf.size = r - compress_buffer; | ||
377 | inbuf.pos = 0; | ||
378 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
379 | |||
380 | /* Check if there still is enough buffer left */ | ||
381 | while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) { | ||
382 | iovector.iov_len = outbuf.size; | ||
383 | |||
384 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
385 | free(iovector.iov_base); | ||
386 | ZSTD_freeCCtx(zstream); | ||
387 | return mutex_bucket_unlock(bucket, 0); | ||
388 | } | ||
389 | /* Allocate a fresh output buffer */ | ||
390 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
391 | if (!iovector.iov_base) { | ||
392 | fprintf(stderr, "Out of memory trying to claim ouput buffer\n"); | ||
393 | ZSTD_freeCCtx(zstream); | ||
394 | return mutex_bucket_unlock(bucket, 0); | ||
395 | } | ||
396 | |||
397 | outbuf.dst = iovector.iov_base; | ||
398 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
399 | outbuf.pos = 0; | ||
400 | |||
401 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
402 | } | ||
216 | } | 403 | } |
217 | 404 | ||
218 | /* All torrents done: release lock on current bucket */ | 405 | /* All torrents done: release lock on current bucket */ |
219 | mutex_bucket_unlock( bucket, 0 ); | 406 | mutex_bucket_unlock(bucket, 0); |
220 | 407 | ||
221 | /* Parent thread died? */ | 408 | /* Parent thread died? */ |
222 | if( !g_opentracker_running ) | 409 | if (!g_opentracker_running) |
223 | return; | 410 | return; |
224 | } | 411 | } |
225 | 412 | ||
226 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) | 413 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { |
227 | r += sprintf( r, "ee" ); | 414 | inbuf.src = (const void *)"ee"; |
415 | inbuf.size = strlen("ee"); | ||
416 | inbuf.pos = 0; | ||
417 | } | ||
418 | |||
419 | more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
228 | 420 | ||
229 | #ifdef WANT_COMPRESSION_GZIP | 421 | iovector.iov_len = outbuf.pos; |
230 | if( mode & TASK_FLAG_GZIP ) { | 422 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { |
231 | strm.next_in = (uint8_t*)compress_buffer; | 423 | free(iovector.iov_base); |
232 | strm.avail_in = r - compress_buffer; | 424 | ZSTD_freeCCtx(zstream); |
233 | if( deflate( &strm, Z_FINISH ) < Z_OK ) | 425 | return; |
234 | fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); | ||
235 | r = (char*)strm.next_out; | ||
236 | |||
237 | while( r >= re ) | ||
238 | if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_FINISH ) ) ) | ||
239 | return mutex_bucket_unlock( bucket, 0 ); | ||
240 | deflateEnd(&strm); | ||
241 | } | 426 | } |
242 | #endif | ||
243 | 427 | ||
244 | /* Release unused memory in current output buffer */ | 428 | /* Check if there's a last batch of data in the zlib buffer */ |
245 | iovec_fixlast( iovec_entries, iovector, r ); | 429 | if (more_bytes) { |
430 | /* Allocate a fresh output buffer */ | ||
431 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
432 | |||
433 | if (!iovector.iov_base) { | ||
434 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | ||
435 | ZSTD_freeCCtx(zstream); | ||
436 | return; | ||
437 | } | ||
438 | |||
439 | outbuf.dst = iovector.iov_base; | ||
440 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
441 | outbuf.pos = 0; | ||
442 | |||
443 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
444 | |||
445 | /* Only pass the new buffer if there actually was some data left in the buffer */ | ||
446 | iovector.iov_len = outbuf.pos; | ||
447 | if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector)) | ||
448 | free(iovector.iov_base); | ||
449 | } | ||
450 | |||
451 | ZSTD_freeCCtx(zstream); | ||
246 | } | 452 | } |
453 | /* WANT_COMPRESSION_ZSTD */ | ||
247 | #endif | 454 | #endif |
248 | 455 | ||
249 | const char *g_version_fullscrape_c = "$Source$: $Revision$\n"; | 456 | /* WANT_FULLSCRAPE */ |
457 | #endif | ||