diff options
author | Dirk Engling <erdgeist@erdgeist.org> | 2021-04-21 14:41:59 +0200 |
---|---|---|
committer | Dirk Engling <erdgeist@erdgeist.org> | 2021-04-21 14:41:59 +0200 |
commit | bfc398182fb51543a7cf37033ef8d8fa18af5e7a (patch) | |
tree | e7f1b6f5968aa74dc69f95c93f0f49f40855181d | |
parent | 27f8189d845779a37b008b2927d81faad3dd4c96 (diff) |
Rework fullscrape worker, unifying non-gzip and gzip code was a bad idea
-rw-r--r-- | ot_fullscrape.c | 283 |
1 files changed, 158 insertions, 125 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c index faea4b9..e7a1c19 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c | |||
@@ -35,16 +35,11 @@ | |||
35 | /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ | 35 | /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ |
36 | #define OT_SCRAPE_MAXENTRYLEN 256 | 36 | #define OT_SCRAPE_MAXENTRYLEN 256 |
37 | 37 | ||
38 | #ifdef WANT_COMPRESSION_GZIP | ||
39 | #define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK | ||
40 | #define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 ) , param1, param2, param3 | ||
41 | #else | ||
42 | #define IF_COMPRESSION( TASK ) | ||
43 | #define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 ) | ||
44 | #endif | ||
45 | |||
46 | /* Forward declaration */ | 38 | /* Forward declaration */ |
47 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); | 39 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); |
40 | #ifdef WANT_COMPRESSION_GZIP | ||
41 | static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); | ||
42 | #endif | ||
48 | 43 | ||
49 | /* Converter function from memory to human readable hex strings | 44 | /* Converter function from memory to human readable hex strings |
50 | XXX - Duplicated from ot_stats. Needs fix. */ | 45 | XXX - Duplicated from ot_stats. Needs fix. */ |
@@ -59,14 +54,17 @@ static void * fullscrape_worker( void * args ) { | |||
59 | 54 | ||
60 | (void) args; | 55 | (void) args; |
61 | 56 | ||
62 | while( 1 ) { | 57 | while( g_opentracker_running ) { |
63 | ot_tasktype tasktype = TASK_FULLSCRAPE; | 58 | ot_tasktype tasktype = TASK_FULLSCRAPE; |
64 | ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); | 59 | ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); |
65 | fullscrape_make( &iovec_entries, &iovector, tasktype ); | 60 | #ifdef WANT_COMPRESSION_GZIP |
61 | if (tasktype & TASK_FLAG_GZIP) | ||
62 | fullscrape_make_gzip( &iovec_entries, &iovector, tasktype ); | ||
63 | else | ||
64 | #endif | ||
65 | fullscrape_make( &iovec_entries, &iovector, tasktype ); | ||
66 | if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) | 66 | if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) |
67 | iovec_free( &iovec_entries, &iovector ); | 67 | iovec_free( &iovec_entries, &iovector ); |
68 | if( !g_opentracker_running ) | ||
69 | return NULL; | ||
70 | } | 68 | } |
71 | return NULL; | 69 | return NULL; |
72 | } | 70 | } |
@@ -84,68 +82,57 @@ void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) { | |||
84 | mutex_workqueue_pushtask( sock, tasktype ); | 82 | mutex_workqueue_pushtask( sock, tasktype ); |
85 | } | 83 | } |
86 | 84 | ||
87 | static int fullscrape_increase( int *iovec_entries, struct iovec **iovector, | 85 | static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_peerlist *peer_list, ot_hash *hash ) { |
88 | char **r, char **re WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode, int zaction ) ) { | 86 | switch( mode & TASK_TASK_MASK ) { |
89 | /* Allocate a fresh output buffer at the end of our buffers list */ | 87 | case TASK_FULLSCRAPE: |
90 | if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) { | 88 | default: |
91 | 89 | /* push hash as bencoded string */ | |
92 | /* Deallocate gzip buffers */ | 90 | *r++='2'; *r++='0'; *r++=':'; |
93 | IF_COMPRESSION( deflateEnd(strm); ) | 91 | memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash); |
94 | 92 | /* push rest of the scrape string */ | |
95 | /* Release lock on current bucket and return */ | 93 | r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); |
96 | return -1; | 94 | |
97 | } | 95 | break; |
98 | 96 | case TASK_FULLSCRAPE_TPB_ASCII: | |
99 | /* Adjust new end of output buffer */ | 97 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); |
100 | *re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; | 98 | r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); |
101 | 99 | break; | |
102 | /* When compressing, we have all the bytes in output buffer */ | 100 | case TASK_FULLSCRAPE_TPB_ASCII_PLUS: |
103 | #ifdef WANT_COMPRESSION_GZIP | 101 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); |
104 | if( mode & TASK_FLAG_GZIP ) { | 102 | r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count ); |
105 | int zres; | 103 | break; |
106 | *re -= OT_SCRAPE_MAXENTRYLEN; | 104 | case TASK_FULLSCRAPE_TPB_BINARY: |
107 | strm->next_out = (uint8_t*)*r; | 105 | memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash); |
108 | strm->avail_out = OT_SCRAPE_CHUNK_SIZE; | 106 | *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count ); |
109 | zres = deflate( strm, zaction ); | 107 | *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) ); |
110 | if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) | 108 | r+=8; |
111 | fprintf( stderr, "deflate() failed while in fullscrape_increase(%d).\n", zaction ); | 109 | break; |
112 | *r = (char*)strm->next_out; | 110 | case TASK_FULLSCRAPE_TPB_URLENCODED: |
113 | } | 111 | r += fmt_urlencoded( r, (char *)*hash, 20 ); |
114 | #endif | 112 | r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); |
115 | 113 | break; | |
116 | return 0; | 114 | case TASK_FULLSCRAPE_TRACKERSTATE: |
115 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); | ||
116 | r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count ); | ||
117 | break; | ||
118 | } | ||
119 | return r; | ||
117 | } | 120 | } |
118 | 121 | ||
119 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { | 122 | static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { |
120 | int bucket; | 123 | int bucket; |
121 | char *r, *re; | 124 | char *r, *re; |
122 | #ifdef WANT_COMPRESSION_GZIP | ||
123 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; | ||
124 | z_stream strm; | ||
125 | #endif | ||
126 | 125 | ||
127 | /* Setup return vector... */ | 126 | /* Setup return vector... */ |
128 | *iovec_entries = 0; | 127 | *iovec_entries = 0; |
129 | *iovector = NULL; | 128 | *iovector = NULL; |
130 | if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) | 129 | r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ); |
130 | if( !r ) | ||
131 | return; | 131 | return; |
132 | 132 | ||
133 | /* re points to low watermark */ | 133 | /* re points to low watermark */ |
134 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; | 134 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; |
135 | 135 | ||
136 | #ifdef WANT_COMPRESSION_GZIP | ||
137 | if( mode & TASK_FLAG_GZIP ) { | ||
138 | re += OT_SCRAPE_MAXENTRYLEN; | ||
139 | byte_zero( &strm, sizeof(strm) ); | ||
140 | strm.next_in = (uint8_t*)compress_buffer; | ||
141 | strm.next_out = (uint8_t*)r; | ||
142 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | ||
143 | if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK ) | ||
144 | fprintf( stderr, "not ok.\n" ); | ||
145 | r = compress_buffer; | ||
146 | } | ||
147 | #endif | ||
148 | |||
149 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) | 136 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) |
150 | r += sprintf( r, "d5:filesd" ); | 137 | r += sprintf( r, "d5:filesd" ); |
151 | 138 | ||
@@ -153,66 +140,98 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas | |||
153 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | 140 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { |
154 | /* Get exclusive access to that bucket */ | 141 | /* Get exclusive access to that bucket */ |
155 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | 142 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); |
156 | size_t tor_offset; | 143 | ot_torrent *torrents = (ot_torrent*)(torrents_list->data); |
144 | size_t i; | ||
157 | 145 | ||
158 | /* For each torrent in this bucket.. */ | 146 | /* For each torrent in this bucket.. */ |
159 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | 147 | for( i=0; i<torrents_list->size; ++i ) { |
160 | /* Address torrents members */ | 148 | r = fullscrape_write_one( mode, r, torrents[i].peer_list, &torrents[i].hash ); |
161 | ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; | 149 | |
162 | ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash; | 150 | if( r > re) { |
163 | 151 | /* Allocate a fresh output buffer at the end of our buffers list */ | |
164 | switch( mode & TASK_TASK_MASK ) { | 152 | r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SCRAPE_CHUNK_SIZE ); |
165 | case TASK_FULLSCRAPE: | 153 | if( !r ) |
166 | default: | 154 | return mutex_bucket_unlock( bucket, 0 ); |
167 | /* push hash as bencoded string */ | 155 | |
168 | *r++='2'; *r++='0'; *r++=':'; | 156 | /* re points to low watermark */ |
169 | memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash); | 157 | re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; |
170 | /* push rest of the scrape string */ | ||
171 | r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); | ||
172 | |||
173 | break; | ||
174 | case TASK_FULLSCRAPE_TPB_ASCII: | ||
175 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); | ||
176 | r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); | ||
177 | break; | ||
178 | case TASK_FULLSCRAPE_TPB_ASCII_PLUS: | ||
179 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); | ||
180 | r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count ); | ||
181 | break; | ||
182 | case TASK_FULLSCRAPE_TPB_BINARY: | ||
183 | memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash); | ||
184 | *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count ); | ||
185 | *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) ); | ||
186 | r+=8; | ||
187 | break; | ||
188 | case TASK_FULLSCRAPE_TPB_URLENCODED: | ||
189 | r += fmt_urlencoded( r, (char *)*hash, 20 ); | ||
190 | r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); | ||
191 | break; | ||
192 | case TASK_FULLSCRAPE_TRACKERSTATE: | ||
193 | to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); | ||
194 | r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count ); | ||
195 | break; | ||
196 | } | 158 | } |
159 | } | ||
160 | |||
161 | /* All torrents done: release lock on current bucket */ | ||
162 | mutex_bucket_unlock( bucket, 0 ); | ||
163 | |||
164 | /* Parent thread died? */ | ||
165 | if( !g_opentracker_running ) | ||
166 | return; | ||
167 | } | ||
168 | |||
169 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) | ||
170 | r += sprintf( r, "ee" ); | ||
171 | |||
172 | /* Release unused memory in current output buffer */ | ||
173 | iovec_fixlast( iovec_entries, iovector, r ); | ||
174 | } | ||
197 | 175 | ||
198 | #ifdef WANT_COMPRESSION_GZIP | 176 | #ifdef WANT_COMPRESSION_GZIP |
199 | if( mode & TASK_FLAG_GZIP ) { | 177 | |
200 | int zres; | 178 | static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { |
201 | strm.next_in = (uint8_t*)compress_buffer; | 179 | int bucket; |
202 | strm.avail_in = r - compress_buffer; | 180 | char *r; |
181 | int zres; | ||
182 | z_stream strm; | ||
183 | |||
184 | /* Setup return vector... */ | ||
185 | *iovec_entries = 0; | ||
186 | *iovector = NULL; | ||
187 | r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ); | ||
188 | if( !r ) | ||
189 | return; | ||
190 | |||
191 | byte_zero( &strm, sizeof(strm) ); | ||
192 | strm.next_out = (uint8_t*)r; | ||
193 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | ||
194 | if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK ) | ||
195 | fprintf( stderr, "not ok.\n" ); | ||
196 | |||
197 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { | ||
198 | strm.next_in = (uint8_t*)"d5:filesd"; | ||
199 | strm.avail_in = strlen("d5:filesd"); | ||
200 | zres = deflate( &strm, Z_NO_FLUSH ); | ||
201 | } | ||
202 | |||
203 | /* For each bucket... */ | ||
204 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | ||
205 | /* Get exclusive access to that bucket */ | ||
206 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | ||
207 | ot_torrent *torrents = (ot_torrent*)(torrents_list->data); | ||
208 | size_t i; | ||
209 | |||
210 | /* For each torrent in this bucket.. */ | ||
211 | for( i=0; i<torrents_list->size; ++i ) { | ||
212 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; | ||
213 | r = fullscrape_write_one( mode, compress_buffer, torrents[i].peer_list, &torrents[i].hash ); | ||
214 | strm.next_in = (uint8_t*)compress_buffer; | ||
215 | strm.avail_in = r - compress_buffer; | ||
216 | zres = deflate( &strm, Z_NO_FLUSH ); | ||
217 | if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) | ||
218 | fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); | ||
219 | |||
220 | /* Check if there still is enough buffer left */ | ||
221 | while( !strm.avail_out ) { | ||
222 | /* Allocate a fresh output buffer at the end of our buffers list */ | ||
223 | r = iovec_fix_increase_or_free( iovec_entries, iovector, strm.next_out, OT_SCRAPE_CHUNK_SIZE ); | ||
224 | if( !r ) { | ||
225 | fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" ); | ||
226 | deflateEnd(&strm); | ||
227 | return mutex_bucket_unlock( bucket, 0 ); | ||
228 | } | ||
229 | strm.next_out = (uint8_t*)r; | ||
230 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | ||
203 | zres = deflate( &strm, Z_NO_FLUSH ); | 231 | zres = deflate( &strm, Z_NO_FLUSH ); |
204 | if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) | 232 | if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) |
205 | fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); | 233 | fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); |
206 | r = (char*)strm.next_out; | ||
207 | } | 234 | } |
208 | #endif | ||
209 | |||
210 | /* Check if there still is enough buffer left */ | ||
211 | while( r >= re ) | ||
212 | if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_NO_FLUSH ) ) ) | ||
213 | return mutex_bucket_unlock( bucket, 0 ); | ||
214 | |||
215 | IF_COMPRESSION( r = compress_buffer; ) | ||
216 | } | 235 | } |
217 | 236 | ||
218 | /* All torrents done: release lock on current bucket */ | 237 | /* All torrents done: release lock on current bucket */ |
@@ -223,27 +242,41 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas | |||
223 | return; | 242 | return; |
224 | } | 243 | } |
225 | 244 | ||
226 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) | 245 | if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { |
227 | r += sprintf( r, "ee" ); | 246 | strm.next_in = (uint8_t*)"ee"; |
247 | strm.avail_in = strlen("ee"); | ||
248 | } | ||
228 | 249 | ||
229 | #ifdef WANT_COMPRESSION_GZIP | 250 | if( deflate( &strm, Z_FINISH ) < Z_OK ) |
230 | if( mode & TASK_FLAG_GZIP ) { | 251 | fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); |
231 | strm.next_in = (uint8_t*)compress_buffer; | 252 | |
232 | strm.avail_in = r - compress_buffer; | 253 | if( !strm.avail_out ) { |
254 | unsigned int pending; | ||
255 | int bits; | ||
256 | deflatePending( &strm, &pending, &bits); | ||
257 | pending += ( bits ? 1 : 0 ); | ||
258 | |||
259 | /* Allocate a fresh output buffer at the end of our buffers list */ | ||
260 | r = iovec_fix_increase_or_free( iovec_entries, iovector, strm.next_out, pending ); | ||
261 | if( !r ) { | ||
262 | fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" ); | ||
263 | deflateEnd(&strm); | ||
264 | return mutex_bucket_unlock( bucket, 0 ); | ||
265 | } | ||
266 | strm.next_out = (uint8_t*)r; | ||
267 | strm.avail_out = pending; | ||
233 | if( deflate( &strm, Z_FINISH ) < Z_OK ) | 268 | if( deflate( &strm, Z_FINISH ) < Z_OK ) |
234 | fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); | 269 | fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); |
235 | r = (char*)strm.next_out; | ||
236 | |||
237 | while( r >= re ) | ||
238 | if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_FINISH ) ) ) | ||
239 | return mutex_bucket_unlock( bucket, 0 ); | ||
240 | deflateEnd(&strm); | ||
241 | } | 270 | } |
242 | #endif | ||
243 | 271 | ||
244 | /* Release unused memory in current output buffer */ | 272 | /* Release unused memory in current output buffer */ |
245 | iovec_fixlast( iovec_entries, iovector, r ); | 273 | iovec_fixlast( iovec_entries, iovector, strm.next_out ); |
274 | |||
275 | deflateEnd(&strm); | ||
246 | } | 276 | } |
277 | /* WANT_COMPRESSION_GZIP */ | ||
247 | #endif | 278 | #endif |
248 | 279 | ||
280 | /* WANT_FULLSCRAPE */ | ||
281 | #endif | ||
249 | const char *g_version_fullscrape_c = "$Source$: $Revision$\n"; | 282 | const char *g_version_fullscrape_c = "$Source$: $Revision$\n"; |