diff options
Diffstat (limited to 'ot_fullscrape.c')
-rw-r--r-- | ot_fullscrape.c | 470 |
1 files changed, 322 insertions, 148 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c index 5d115dc..6fd6d1c 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c | |||
@@ -6,14 +6,18 @@ | |||
6 | #ifdef WANT_FULLSCRAPE | 6 | #ifdef WANT_FULLSCRAPE |
7 | 7 | ||
8 | /* System */ | 8 | /* System */ |
9 | #include <sys/param.h> | 9 | #include <arpa/inet.h> |
10 | #include <pthread.h> | ||
10 | #include <stdio.h> | 11 | #include <stdio.h> |
11 | #include <string.h> | 12 | #include <string.h> |
12 | #include <pthread.h> | 13 | #include <sys/param.h> |
13 | #include <arpa/inet.h> | ||
14 | #ifdef WANT_COMPRESSION_GZIP | 14 | #ifdef WANT_COMPRESSION_GZIP |
15 | #include <zlib.h> | 15 | #include <zlib.h> |
16 | #endif | 16 | #endif |
17 | #ifdef WANT_COMPRESSION_ZSTD | ||
18 | #include <zstd.h> | ||
19 | #endif | ||
20 | |||
17 | 21 | ||
18 | /* Libowfat */ | 22 | /* Libowfat */ |
19 | #include "byte.h" | 23 | #include "byte.h" |
@@ -21,50 +25,64 @@ | |||
21 | #include "textcode.h" | 25 | #include "textcode.h" |
22 | 26 | ||
23 | /* Opentracker */ | 27 | /* Opentracker */ |
24 | #include "trackerlogic.h" | ||
25 | #include "ot_mutex.h" | ||
26 | #include "ot_iovec.h" | ||
27 | #include "ot_fullscrape.h" | 28 | #include "ot_fullscrape.h" |
29 | #include "ot_iovec.h" | ||
30 | #include "ot_mutex.h" | ||
31 | #include "trackerlogic.h" | ||
28 | 32 | ||
29 | /* Fetch full scrape info for all torrents | 33 | /* Fetch full scrape info for all torrents |
30 | Full scrapes usually are huge and one does not want to | 34 | Full scrapes usually are huge and one does not want to |
31 | allocate more memory. So lets get them in 512k units | 35 | allocate more memory. So lets get them in 512k units |
32 | */ | 36 | */ |
33 | #define OT_SCRAPE_CHUNK_SIZE (1024*1024) | 37 | #define OT_SCRAPE_CHUNK_SIZE (1024 * 1024) |
34 | 38 | ||
35 | /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ | 39 | /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ |
36 | #define OT_SCRAPE_MAXENTRYLEN 256 | 40 | #define OT_SCRAPE_MAXENTRYLEN 256 |
37 | 41 | ||
38 | /* Forward declaration */ | 42 | /* Forward declaration */ |
39 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); | 43 | static void fullscrape_make(int taskid, ot_tasktype mode); |
40 | #ifdef WANT_COMPRESSION_GZIP | 44 | #ifdef WANT_COMPRESSION_GZIP |
41 | static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); | 45 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode); |
46 | #endif | ||
47 | #ifdef WANT_COMPRESSION_ZSTD | ||
48 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode); | ||
42 | #endif | 49 | #endif |
43 | 50 | ||
44 | /* Converter function from memory to human readable hex strings | 51 | /* Converter function from memory to human readable hex strings |
45 | XXX - Duplicated from ot_stats. Needs fix. */ | 52 | XXX - Duplicated from ot_stats. Needs fix. */ |
46 | static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=d+40;while(d<e){*d++=m[*s>>4];*d++=m[*s++&15];}*d=0;return t;} | 53 | static char *to_hex(char *d, uint8_t *s) { |
54 | char *m = "0123456789ABCDEF"; | ||
55 | char *t = d; | ||
56 | char *e = d + 40; | ||
57 | while (d < e) { | ||
58 | *d++ = m[*s >> 4]; | ||
59 | *d++ = m[*s++ & 15]; | ||
60 | } | ||
61 | *d = 0; | ||
62 | return t; | ||
63 | } | ||
47 | 64 | ||
48 | /* This is the entry point into this worker thread | 65 | /* This is the entry point into this worker thread |
49 | It grabs tasks from mutex_tasklist and delivers results back | 66 | It grabs tasks from mutex_tasklist and delivers results back |
50 | */ | 67 | */ |
51 | static void * fullscrape_worker( void * args ) { | 68 | static void *fullscrape_worker(void *args) { |
52 | int iovec_entries; | 69 | (void)args; |
53 | struct iovec *iovector; | ||
54 | 70 | ||
55 | (void) args; | 71 | while (g_opentracker_running) { |
56 | |||
57 | while( g_opentracker_running ) { | ||
58 | ot_tasktype tasktype = TASK_FULLSCRAPE; | 72 | ot_tasktype tasktype = TASK_FULLSCRAPE; |
59 | ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); | 73 | ot_taskid taskid = mutex_workqueue_poptask(&tasktype); |
74 | #ifdef WANT_COMPRESSION_ZSTD | ||
75 | if (tasktype & TASK_FLAG_ZSTD) | ||
76 | fullscrape_make_zstd(taskid, tasktype); | ||
77 | else | ||
78 | #endif | ||
60 | #ifdef WANT_COMPRESSION_GZIP | 79 | #ifdef WANT_COMPRESSION_GZIP |
61 | if (tasktype & TASK_FLAG_GZIP) | 80 | if (tasktype & TASK_FLAG_GZIP) |
62 | fullscrape_make_gzip( &iovec_entries, &iovector, tasktype ); | 81 | fullscrape_make_gzip(taskid, tasktype); |
63 | else | 82 | else |
64 | #endif | 83 | #endif |
65 | fullscrape_make( &iovec_entries, &iovector, tasktype ); | 84 | fullscrape_make(taskid, tasktype); |
66 | if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) | 85 | mutex_workqueue_pushchunked(taskid, NULL); |
67 | iovec_free( &iovec_entries, &iovector ); | ||
68 | } | 86 | } |
69 | return NULL; | 87 | return NULL; |
70 | } | 88 | } |
@@ -82,76 +100,92 @@ void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) { | |||
82 | mutex_workqueue_pushtask( sock, tasktype ); | 100 | mutex_workqueue_pushtask( sock, tasktype ); |
83 | } | 101 | } |
84 | 102 | ||
85 | static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_peerlist *peer_list, ot_hash *hash ) { | 103 | static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torrent, ot_hash *hash ) { |
86 | switch( mode & TASK_TASK_MASK ) { | 104 | size_t seed_count = torrent->peer_list6->seed_count + torrent->peer_list4->seed_count; |
87 | case TASK_FULLSCRAPE: | 105 | size_t peer_count = torrent->peer_list6->peer_count + torrent->peer_list4->peer_count; |
88 | default: | 106 | size_t down_count = torrent->peer_list6->down_count + torrent->peer_list4->down_count; |
89 | /* push hash as bencoded string */ | 107 | |
90 | *r++='2'; *r++='0'; *r++=':'; | 108 | switch (mode & TASK_TASK_MASK) { |
91 | memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash); | 109 | case TASK_FULLSCRAPE: |
92 | /* push rest of the scrape string */ | 110 | default: |
93 | r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); | 111 | /* push hash as bencoded string */ |
94 | 112 | *r++ = '2'; | |
95 | break; | 113 | *r++ = '0'; |
96 | case TASK_FULLSCRAPE_TPB_ASCII: | 114 | *r++ = ':'; |
97 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); | 115 | memcpy(r, hash, sizeof(ot_hash)); |
98 | r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); | 116 | r += sizeof(ot_hash); |
99 | break; | 117 | /* push rest of the scrape string */ |
100 | case TASK_FULLSCRAPE_TPB_ASCII_PLUS: | 118 | r += sprintf(r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", seed_count, down_count, peer_count - seed_count); |
101 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); | 119 | |
102 | r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count ); | 120 | break; |
103 | break; | 121 | case TASK_FULLSCRAPE_TPB_ASCII: |
104 | case TASK_FULLSCRAPE_TPB_BINARY: | 122 | to_hex(r, *hash); |
105 | memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash); | 123 | r += 2 * sizeof(ot_hash); |
106 | *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count ); | 124 | r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count); |
107 | *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) ); | 125 | break; |
108 | r+=8; | 126 | case TASK_FULLSCRAPE_TPB_ASCII_PLUS: |
109 | break; | 127 | to_hex(r, *hash); |
110 | case TASK_FULLSCRAPE_TPB_URLENCODED: | 128 | r += 2 * sizeof(ot_hash); |
111 | r += fmt_urlencoded( r, (char *)*hash, 20 ); | 129 | r += sprintf(r, ":%zd:%zd:%zd\n", seed_count, peer_count - seed_count, down_count); |
112 | r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); | 130 | break; |
113 | break; | 131 | case TASK_FULLSCRAPE_TPB_BINARY: |
114 | case TASK_FULLSCRAPE_TRACKERSTATE: | 132 | memcpy(r, *hash, sizeof(ot_hash)); |
115 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); | 133 | r += sizeof(ot_hash); |
116 | r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count ); | 134 | *(uint32_t *)(r + 0) = htonl((uint32_t)seed_count); |
117 | break; | 135 | *(uint32_t *)(r + 4) = htonl((uint32_t)(peer_count - seed_count)); |
118 | } | 136 | r += 8; |
119 | return r; | 137 | break; |
138 | case TASK_FULLSCRAPE_TPB_URLENCODED: | ||
139 | r += fmt_urlencoded(r, (char *)*hash, 20); | ||
140 | r += sprintf(r, ":%zd:%zd\n", seed_count, peer_count - seed_count); | ||
141 | break; | ||
142 | case TASK_FULLSCRAPE_TRACKERSTATE: | ||
143 | to_hex(r, *hash); | ||
144 | r += 2 * sizeof(ot_hash); | ||
145 | r += sprintf(r, ":%zd:%zd\n", torrent->peer_list6->base, down_count); | ||
146 | break; | ||
147 | } | ||
148 | return r; | ||
120 | } | 149 | } |
121 | 150 | ||
122 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { | 151 | static void fullscrape_make(int taskid, ot_tasktype mode) { |
123 | int bucket; | 152 | int bucket; |
124 | char *r, *re; | 153 | char *r, *re; |
154 | struct iovec iovector = {NULL, 0}; | ||
125 | 155 | ||
126 | /* Setup return vector... */ | 156 | /* Setup return vector... */ |
127 | *iovec_entries = 0; | 157 | r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); |
128 | *iovector = NULL; | 158 | if (!r) |
129 | r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ); | ||
130 | if( !r ) | ||
131 | return; | 159 | return; |
132 | 160 | ||
133 | /* re points to low watermark */ | 161 | /* re points to low watermark */ |
134 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; | 162 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; |
135 | 163 | ||
136 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) | 164 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) |
137 | r += sprintf( r, "d5:filesd" ); | 165 | r += sprintf(r, "d5:filesd"); |
138 | 166 | ||
139 | /* For each bucket... */ | 167 | /* For each bucket... */ |
140 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | 168 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { |
141 | /* Get exclusive access to that bucket */ | 169 | /* Get exclusive access to that bucket */ |
142 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | 170 | ot_vector *torrents_list = mutex_bucket_lock(bucket); |
143 | ot_torrent *torrents = (ot_torrent*)(torrents_list->data); | 171 | ot_torrent *torrents = (ot_torrent *)(torrents_list->data); |
144 | size_t i; | 172 | size_t i; |
145 | 173 | ||
146 | /* For each torrent in this bucket.. */ | 174 | /* For each torrent in this bucket.. */ |
147 | for( i=0; i<torrents_list->size; ++i ) { | 175 | for (i = 0; i < torrents_list->size; ++i) { |
148 | r = fullscrape_write_one( mode, r, torrents[i].peer_list, &torrents[i].hash ); | 176 | r = fullscrape_write_one(mode, r, torrents + i, &torrents[i].hash); |
149 | 177 | ||
150 | if( r > re) { | 178 | if (r > re) { |
151 | /* Allocate a fresh output buffer at the end of our buffers list */ | 179 | iovector.iov_len = r - (char *)iovector.iov_base; |
152 | r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SCRAPE_CHUNK_SIZE ); | 180 | |
153 | if( !r ) | 181 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { |
154 | return mutex_bucket_unlock( bucket, 0 ); | 182 | free(iovector.iov_base); |
183 | return mutex_bucket_unlock(bucket, 0); | ||
184 | } | ||
185 | /* Allocate a fresh output buffer */ | ||
186 | r = iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
187 | if (!r) | ||
188 | return mutex_bucket_unlock(bucket, 0); | ||
155 | 189 | ||
156 | /* re points to low watermark */ | 190 | /* re points to low watermark */ |
157 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; | 191 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; |
@@ -159,125 +193,265 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas | |||
159 | } | 193 | } |
160 | 194 | ||
161 | /* All torrents done: release lock on current bucket */ | 195 | /* All torrents done: release lock on current bucket */ |
162 | mutex_bucket_unlock( bucket, 0 ); | 196 | mutex_bucket_unlock(bucket, 0); |
163 | 197 | ||
164 | /* Parent thread died? */ | 198 | /* Parent thread died? */ |
165 | if( !g_opentracker_running ) | 199 | if (!g_opentracker_running) |
166 | return; | 200 | return; |
167 | } | 201 | } |
168 | 202 | ||
169 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) | 203 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) |
170 | r += sprintf( r, "ee" ); | 204 | r += sprintf(r, "ee"); |
171 | 205 | ||
172 | /* Release unused memory in current output buffer */ | 206 | /* Send rest of data */ |
173 | iovec_fixlast( iovec_entries, iovector, r ); | 207 | iovector.iov_len = r - (char *)iovector.iov_base; |
208 | if (mutex_workqueue_pushchunked(taskid, &iovector)) | ||
209 | free(iovector.iov_base); | ||
174 | } | 210 | } |
175 | 211 | ||
176 | #ifdef WANT_COMPRESSION_GZIP | 212 | #ifdef WANT_COMPRESSION_GZIP |
177 | 213 | ||
178 | static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { | 214 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { |
179 | int bucket; | 215 | int bucket; |
180 | char *r; | 216 | char *r; |
181 | int zres; | 217 | struct iovec iovector = {NULL, 0}; |
182 | z_stream strm; | 218 | int zres; |
183 | 219 | z_stream strm; | |
184 | /* Setup return vector... */ | 220 | /* Setup return vector... */ |
185 | *iovec_entries = 0; | 221 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); |
186 | *iovector = NULL; | 222 | if (!iovector.iov_base) |
187 | r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ); | ||
188 | if( !r ) | ||
189 | return; | 223 | return; |
190 | 224 | ||
191 | byte_zero( &strm, sizeof(strm) ); | 225 | byte_zero(&strm, sizeof(strm)); |
192 | strm.next_out = (uint8_t*)r; | 226 | strm.next_out = (uint8_t *)iovector.iov_base; |
193 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | 227 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; |
194 | if( deflateInit2(&strm,7,Z_DEFLATED,31,9,Z_DEFAULT_STRATEGY) != Z_OK ) | 228 | if (deflateInit2(&strm, 7, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY) != Z_OK) |
195 | fprintf( stderr, "not ok.\n" ); | 229 | fprintf(stderr, "not ok.\n"); |
196 | 230 | ||
197 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { | 231 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { |
198 | strm.next_in = (uint8_t*)"d5:filesd"; | 232 | strm.next_in = (uint8_t *)"d5:filesd"; |
199 | strm.avail_in = strlen("d5:filesd"); | 233 | strm.avail_in = strlen("d5:filesd"); |
200 | zres = deflate( &strm, Z_NO_FLUSH ); | 234 | zres = deflate(&strm, Z_NO_FLUSH); |
201 | } | 235 | } |
202 | 236 | ||
203 | /* For each bucket... */ | 237 | /* For each bucket... */ |
204 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | 238 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { |
205 | /* Get exclusive access to that bucket */ | 239 | /* Get exclusive access to that bucket */ |
206 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | 240 | ot_vector *torrents_list = mutex_bucket_lock(bucket); |
207 | ot_torrent *torrents = (ot_torrent*)(torrents_list->data); | 241 | ot_torrent *torrents = (ot_torrent *)(torrents_list->data); |
208 | size_t i; | 242 | size_t i; |
209 | 243 | ||
210 | /* For each torrent in this bucket.. */ | 244 | /* For each torrent in this bucket.. */ |
211 | for( i=0; i<torrents_list->size; ++i ) { | 245 | for (i = 0; i < torrents_list->size; ++i) { |
212 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; | 246 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; |
213 | r = fullscrape_write_one( mode, compress_buffer, torrents[i].peer_list, &torrents[i].hash ); | 247 | r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash); |
214 | strm.next_in = (uint8_t*)compress_buffer; | 248 | strm.next_in = (uint8_t *)compress_buffer; |
215 | strm.avail_in = r - compress_buffer; | 249 | strm.avail_in = r - compress_buffer; |
216 | zres = deflate( &strm, Z_NO_FLUSH ); | 250 | zres = deflate(&strm, Z_NO_FLUSH); |
217 | if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) | 251 | if ((zres < Z_OK) && (zres != Z_BUF_ERROR)) |
218 | fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); | 252 | fprintf(stderr, "deflate() failed while in fullscrape_make().\n"); |
219 | 253 | ||
220 | /* Check if there still is enough buffer left */ | 254 | /* Check if there still is enough buffer left */ |
221 | while( !strm.avail_out ) { | 255 | while (!strm.avail_out) { |
222 | /* Allocate a fresh output buffer at the end of our buffers list */ | 256 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; |
223 | r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ); | 257 | |
224 | if( !r ) { | 258 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { |
225 | fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" ); | 259 | free(iovector.iov_base); |
226 | iovec_free( iovec_entries, iovector ); | 260 | return mutex_bucket_unlock(bucket, 0); |
261 | } | ||
262 | /* Allocate a fresh output buffer */ | ||
263 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
264 | if (!iovector.iov_base) { | ||
265 | fprintf(stderr, "Out of memory trying to claim ouput buffer\n"); | ||
227 | deflateEnd(&strm); | 266 | deflateEnd(&strm); |
228 | return mutex_bucket_unlock( bucket, 0 ); | 267 | return mutex_bucket_unlock(bucket, 0); |
229 | } | 268 | } |
230 | strm.next_out = (uint8_t*)r; | 269 | strm.next_out = (uint8_t *)iovector.iov_base; |
231 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | 270 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; |
232 | zres = deflate( &strm, Z_NO_FLUSH ); | 271 | zres = deflate(&strm, Z_NO_FLUSH); |
233 | if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) | 272 | if ((zres < Z_OK) && (zres != Z_BUF_ERROR)) |
234 | fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); | 273 | fprintf(stderr, "deflate() failed while in fullscrape_make().\n"); |
235 | } | 274 | } |
236 | } | 275 | } |
237 | 276 | ||
238 | /* All torrents done: release lock on current bucket */ | 277 | /* All torrents done: release lock on current bucket */ |
239 | mutex_bucket_unlock( bucket, 0 ); | 278 | mutex_bucket_unlock(bucket, 0); |
240 | 279 | ||
241 | /* Parent thread died? */ | 280 | /* Parent thread died? */ |
242 | if( !g_opentracker_running ) | 281 | if (!g_opentracker_running) { |
282 | deflateEnd(&strm); | ||
243 | return; | 283 | return; |
284 | } | ||
244 | } | 285 | } |
245 | 286 | ||
246 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { | 287 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { |
247 | strm.next_in = (uint8_t*)"ee"; | 288 | strm.next_in = (uint8_t *)"ee"; |
248 | strm.avail_in = strlen("ee"); | 289 | strm.avail_in = strlen("ee"); |
249 | } | 290 | } |
250 | 291 | ||
251 | if( deflate( &strm, Z_FINISH ) < Z_OK ) | 292 | if (deflate(&strm, Z_FINISH) < Z_OK) |
252 | fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); | 293 | fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n"); |
253 | |||
254 | if( !strm.avail_out ) { | ||
255 | unsigned int pending; | ||
256 | int bits; | ||
257 | deflatePending( &strm, &pending, &bits); | ||
258 | pending += ( bits ? 1 : 0 ); | ||
259 | 294 | ||
260 | /* Allocate a fresh output buffer at the end of our buffers list */ | 295 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; |
261 | r = iovec_fix_increase_or_free( iovec_entries, iovector, strm.next_out, pending ); | 296 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { |
262 | if( !r ) { | 297 | free(iovector.iov_base); |
263 | fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" ); | 298 | deflateEnd(&strm); |
264 | deflateEnd(&strm); | 299 | return; |
265 | return mutex_bucket_unlock( bucket, 0 ); | ||
266 | } | ||
267 | strm.next_out = (uint8_t*)r; | ||
268 | strm.avail_out = pending; | ||
269 | if( deflate( &strm, Z_FINISH ) < Z_OK ) | ||
270 | fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); | ||
271 | } | 300 | } |
272 | 301 | ||
273 | /* Release unused memory in current output buffer */ | 302 | /* Check if there's a last batch of data in the zlib buffer */ |
274 | iovec_fixlast( iovec_entries, iovector, strm.next_out ); | 303 | if (!strm.avail_out) { |
304 | /* Allocate a fresh output buffer */ | ||
305 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
306 | |||
307 | if (!iovector.iov_base) { | ||
308 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | ||
309 | deflateEnd(&strm); | ||
310 | return; | ||
311 | } | ||
312 | strm.next_out = iovector.iov_base; | ||
313 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | ||
314 | if (deflate(&strm, Z_FINISH) < Z_OK) | ||
315 | fprintf(stderr, "deflate() failed while in fullscrape_make()'s endgame.\n"); | ||
316 | |||
317 | /* Only pass the new buffer if there actually was some data left in the buffer */ | ||
318 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; | ||
319 | if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector)) | ||
320 | free(iovector.iov_base); | ||
321 | } | ||
275 | 322 | ||
276 | deflateEnd(&strm); | 323 | deflateEnd(&strm); |
277 | } | 324 | } |
278 | /* WANT_COMPRESSION_GZIP */ | 325 | /* WANT_COMPRESSION_GZIP */ |
279 | #endif | 326 | #endif |
280 | 327 | ||
328 | #ifdef WANT_COMPRESSION_ZSTD | ||
329 | |||
330 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode) { | ||
331 | int bucket; | ||
332 | char *r; | ||
333 | struct iovec iovector = {NULL, 0}; | ||
334 | ZSTD_CCtx *zstream = ZSTD_createCCtx(); | ||
335 | ZSTD_inBuffer inbuf; | ||
336 | ZSTD_outBuffer outbuf; | ||
337 | size_t more_bytes; | ||
338 | |||
339 | if (!zstream) | ||
340 | return; | ||
341 | |||
342 | /* Setup return vector... */ | ||
343 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
344 | if (!iovector.iov_base) { | ||
345 | ZSTD_freeCCtx(zstream); | ||
346 | return; | ||
347 | } | ||
348 | |||
349 | /* Working with a compression level 6 is half as fast as level 3, but | ||
350 | seems to be the last reasonable bump that's worth extra cpu */ | ||
351 | ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6); | ||
352 | |||
353 | outbuf.dst = iovector.iov_base; | ||
354 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
355 | outbuf.pos = 0; | ||
356 | |||
357 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
358 | inbuf.src = (const void *)"d5:filesd"; | ||
359 | inbuf.size = strlen("d5:filesd"); | ||
360 | inbuf.pos = 0; | ||
361 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
362 | } | ||
363 | |||
364 | /* For each bucket... */ | ||
365 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { | ||
366 | /* Get exclusive access to that bucket */ | ||
367 | ot_vector *torrents_list = mutex_bucket_lock(bucket); | ||
368 | ot_torrent *torrents = (ot_torrent *)(torrents_list->data); | ||
369 | size_t i; | ||
370 | |||
371 | /* For each torrent in this bucket.. */ | ||
372 | for (i = 0; i < torrents_list->size; ++i) { | ||
373 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; | ||
374 | r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash); | ||
375 | inbuf.src = compress_buffer; | ||
376 | inbuf.size = r - compress_buffer; | ||
377 | inbuf.pos = 0; | ||
378 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
379 | |||
380 | /* Check if there still is enough buffer left */ | ||
381 | while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) { | ||
382 | iovector.iov_len = outbuf.size; | ||
383 | |||
384 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
385 | free(iovector.iov_base); | ||
386 | ZSTD_freeCCtx(zstream); | ||
387 | return mutex_bucket_unlock(bucket, 0); | ||
388 | } | ||
389 | /* Allocate a fresh output buffer */ | ||
390 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
391 | if (!iovector.iov_base) { | ||
392 | fprintf(stderr, "Out of memory trying to claim ouput buffer\n"); | ||
393 | ZSTD_freeCCtx(zstream); | ||
394 | return mutex_bucket_unlock(bucket, 0); | ||
395 | } | ||
396 | |||
397 | outbuf.dst = iovector.iov_base; | ||
398 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
399 | outbuf.pos = 0; | ||
400 | |||
401 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
402 | } | ||
403 | } | ||
404 | |||
405 | /* All torrents done: release lock on current bucket */ | ||
406 | mutex_bucket_unlock(bucket, 0); | ||
407 | |||
408 | /* Parent thread died? */ | ||
409 | if (!g_opentracker_running) | ||
410 | return; | ||
411 | } | ||
412 | |||
413 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
414 | inbuf.src = (const void *)"ee"; | ||
415 | inbuf.size = strlen("ee"); | ||
416 | inbuf.pos = 0; | ||
417 | } | ||
418 | |||
419 | more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
420 | |||
421 | iovector.iov_len = outbuf.pos; | ||
422 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
423 | free(iovector.iov_base); | ||
424 | ZSTD_freeCCtx(zstream); | ||
425 | return; | ||
426 | } | ||
427 | |||
428 | /* Check if there's a last batch of data in the zlib buffer */ | ||
429 | if (more_bytes) { | ||
430 | /* Allocate a fresh output buffer */ | ||
431 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
432 | |||
433 | if (!iovector.iov_base) { | ||
434 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | ||
435 | ZSTD_freeCCtx(zstream); | ||
436 | return; | ||
437 | } | ||
438 | |||
439 | outbuf.dst = iovector.iov_base; | ||
440 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
441 | outbuf.pos = 0; | ||
442 | |||
443 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
444 | |||
445 | /* Only pass the new buffer if there actually was some data left in the buffer */ | ||
446 | iovector.iov_len = outbuf.pos; | ||
447 | if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector)) | ||
448 | free(iovector.iov_base); | ||
449 | } | ||
450 | |||
451 | ZSTD_freeCCtx(zstream); | ||
452 | } | ||
453 | /* WANT_COMPRESSION_ZSTD */ | ||
454 | #endif | ||
455 | |||
281 | /* WANT_FULLSCRAPE */ | 456 | /* WANT_FULLSCRAPE */ |
282 | #endif | 457 | #endif |
283 | const char *g_version_fullscrape_c = "$Source$: $Revision$\n"; | ||