summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDirk Engling <erdgeist@erdgeist.org>2021-04-21 14:41:59 +0200
committerDirk Engling <erdgeist@erdgeist.org>2021-04-21 14:41:59 +0200
commitbfc398182fb51543a7cf37033ef8d8fa18af5e7a (patch)
treee7f1b6f5968aa74dc69f95c93f0f49f40855181d
parent27f8189d845779a37b008b2927d81faad3dd4c96 (diff)
Rework fullscrape worker, unifying non-gzip and gzip code was a bad idea
-rw-r--r--ot_fullscrape.c283
1 files changed, 158 insertions, 125 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c
index faea4b9..e7a1c19 100644
--- a/ot_fullscrape.c
+++ b/ot_fullscrape.c
@@ -35,16 +35,11 @@
35/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ 35/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
36#define OT_SCRAPE_MAXENTRYLEN 256 36#define OT_SCRAPE_MAXENTRYLEN 256
37 37
38#ifdef WANT_COMPRESSION_GZIP
39#define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK
40#define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 ) , param1, param2, param3
41#else
42#define IF_COMPRESSION( TASK )
43#define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 )
44#endif
45
46/* Forward declaration */ 38/* Forward declaration */
47static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); 39static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
40#ifdef WANT_COMPRESSION_GZIP
41static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
42#endif
48 43
49/* Converter function from memory to human readable hex strings 44/* Converter function from memory to human readable hex strings
50 XXX - Duplicated from ot_stats. Needs fix. */ 45 XXX - Duplicated from ot_stats. Needs fix. */
@@ -59,14 +54,17 @@ static void * fullscrape_worker( void * args ) {
59 54
60 (void) args; 55 (void) args;
61 56
62 while( 1 ) { 57 while( g_opentracker_running ) {
63 ot_tasktype tasktype = TASK_FULLSCRAPE; 58 ot_tasktype tasktype = TASK_FULLSCRAPE;
64 ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); 59 ot_taskid taskid = mutex_workqueue_poptask( &tasktype );
65 fullscrape_make( &iovec_entries, &iovector, tasktype ); 60#ifdef WANT_COMPRESSION_GZIP
61 if (tasktype & TASK_FLAG_GZIP)
62 fullscrape_make_gzip( &iovec_entries, &iovector, tasktype );
63 else
64#endif
65 fullscrape_make( &iovec_entries, &iovector, tasktype );
66 if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) 66 if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) )
67 iovec_free( &iovec_entries, &iovector ); 67 iovec_free( &iovec_entries, &iovector );
68 if( !g_opentracker_running )
69 return NULL;
70 } 68 }
71 return NULL; 69 return NULL;
72} 70}
@@ -84,68 +82,57 @@ void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) {
84 mutex_workqueue_pushtask( sock, tasktype ); 82 mutex_workqueue_pushtask( sock, tasktype );
85} 83}
86 84
87static int fullscrape_increase( int *iovec_entries, struct iovec **iovector, 85static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_peerlist *peer_list, ot_hash *hash ) {
88 char **r, char **re WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode, int zaction ) ) { 86 switch( mode & TASK_TASK_MASK ) {
89 /* Allocate a fresh output buffer at the end of our buffers list */ 87 case TASK_FULLSCRAPE:
90 if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) { 88 default:
91 89 /* push hash as bencoded string */
92 /* Deallocate gzip buffers */ 90 *r++='2'; *r++='0'; *r++=':';
93 IF_COMPRESSION( deflateEnd(strm); ) 91 memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash);
94 92 /* push rest of the scrape string */
95 /* Release lock on current bucket and return */ 93 r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count );
96 return -1; 94
97 } 95 break;
98 96 case TASK_FULLSCRAPE_TPB_ASCII:
99 /* Adjust new end of output buffer */ 97 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
100 *re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; 98 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
101 99 break;
102 /* When compressing, we have all the bytes in output buffer */ 100 case TASK_FULLSCRAPE_TPB_ASCII_PLUS:
103#ifdef WANT_COMPRESSION_GZIP 101 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
104 if( mode & TASK_FLAG_GZIP ) { 102 r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count );
105 int zres; 103 break;
106 *re -= OT_SCRAPE_MAXENTRYLEN; 104 case TASK_FULLSCRAPE_TPB_BINARY:
107 strm->next_out = (uint8_t*)*r; 105 memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash);
108 strm->avail_out = OT_SCRAPE_CHUNK_SIZE; 106 *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count );
109 zres = deflate( strm, zaction ); 107 *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) );
110 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) 108 r+=8;
111 fprintf( stderr, "deflate() failed while in fullscrape_increase(%d).\n", zaction ); 109 break;
112 *r = (char*)strm->next_out; 110 case TASK_FULLSCRAPE_TPB_URLENCODED:
113 } 111 r += fmt_urlencoded( r, (char *)*hash, 20 );
114#endif 112 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
115 113 break;
116 return 0; 114 case TASK_FULLSCRAPE_TRACKERSTATE:
115 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
116 r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count );
117 break;
118 }
119 return r;
117} 120}
118 121
119static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { 122static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
120 int bucket; 123 int bucket;
121 char *r, *re; 124 char *r, *re;
122#ifdef WANT_COMPRESSION_GZIP
123 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
124 z_stream strm;
125#endif
126 125
127 /* Setup return vector... */ 126 /* Setup return vector... */
128 *iovec_entries = 0; 127 *iovec_entries = 0;
129 *iovector = NULL; 128 *iovector = NULL;
130 if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) 129 r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE );
130 if( !r )
131 return; 131 return;
132 132
133 /* re points to low watermark */ 133 /* re points to low watermark */
134 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; 134 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
135 135
136#ifdef WANT_COMPRESSION_GZIP
137 if( mode & TASK_FLAG_GZIP ) {
138 re += OT_SCRAPE_MAXENTRYLEN;
139 byte_zero( &strm, sizeof(strm) );
140 strm.next_in = (uint8_t*)compress_buffer;
141 strm.next_out = (uint8_t*)r;
142 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
143 if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK )
144 fprintf( stderr, "not ok.\n" );
145 r = compress_buffer;
146 }
147#endif
148
149 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) 136 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
150 r += sprintf( r, "d5:filesd" ); 137 r += sprintf( r, "d5:filesd" );
151 138
@@ -153,66 +140,98 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
153 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { 140 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
154 /* Get exclusive access to that bucket */ 141 /* Get exclusive access to that bucket */
155 ot_vector *torrents_list = mutex_bucket_lock( bucket ); 142 ot_vector *torrents_list = mutex_bucket_lock( bucket );
156 size_t tor_offset; 143 ot_torrent *torrents = (ot_torrent*)(torrents_list->data);
144 size_t i;
157 145
158 /* For each torrent in this bucket.. */ 146 /* For each torrent in this bucket.. */
159 for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { 147 for( i=0; i<torrents_list->size; ++i ) {
160 /* Address torrents members */ 148 r = fullscrape_write_one( mode, r, torrents[i].peer_list, &torrents[i].hash );
161 ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; 149
162 ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash; 150 if( r > re) {
163 151 /* Allocate a fresh output buffer at the end of our buffers list */
164 switch( mode & TASK_TASK_MASK ) { 152 r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SCRAPE_CHUNK_SIZE );
165 case TASK_FULLSCRAPE: 153 if( !r )
166 default: 154 return mutex_bucket_unlock( bucket, 0 );
167 /* push hash as bencoded string */ 155
168 *r++='2'; *r++='0'; *r++=':'; 156 /* re points to low watermark */
169 memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash); 157 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
170 /* push rest of the scrape string */
171 r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count );
172
173 break;
174 case TASK_FULLSCRAPE_TPB_ASCII:
175 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
176 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
177 break;
178 case TASK_FULLSCRAPE_TPB_ASCII_PLUS:
179 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
180 r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count );
181 break;
182 case TASK_FULLSCRAPE_TPB_BINARY:
183 memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash);
184 *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count );
185 *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) );
186 r+=8;
187 break;
188 case TASK_FULLSCRAPE_TPB_URLENCODED:
189 r += fmt_urlencoded( r, (char *)*hash, 20 );
190 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
191 break;
192 case TASK_FULLSCRAPE_TRACKERSTATE:
193 to_hex( r, *hash ); r+= 2 * sizeof(ot_hash);
194 r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count );
195 break;
196 } 158 }
159 }
160
161 /* All torrents done: release lock on current bucket */
162 mutex_bucket_unlock( bucket, 0 );
163
164 /* Parent thread died? */
165 if( !g_opentracker_running )
166 return;
167 }
168
169 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
170 r += sprintf( r, "ee" );
171
172 /* Release unused memory in current output buffer */
173 iovec_fixlast( iovec_entries, iovector, r );
174}
197 175
198#ifdef WANT_COMPRESSION_GZIP 176#ifdef WANT_COMPRESSION_GZIP
199 if( mode & TASK_FLAG_GZIP ) { 177
200 int zres; 178static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
201 strm.next_in = (uint8_t*)compress_buffer; 179 int bucket;
202 strm.avail_in = r - compress_buffer; 180 char *r;
181 int zres;
182 z_stream strm;
183
184 /* Setup return vector... */
185 *iovec_entries = 0;
186 *iovector = NULL;
187 r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE );
188 if( !r )
189 return;
190
191 byte_zero( &strm, sizeof(strm) );
192 strm.next_out = (uint8_t*)r;
193 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
194 if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK )
195 fprintf( stderr, "not ok.\n" );
196
197 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {
198 strm.next_in = (uint8_t*)"d5:filesd";
199 strm.avail_in = strlen("d5:filesd");
200 zres = deflate( &strm, Z_NO_FLUSH );
201 }
202
203 /* For each bucket... */
204 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
205 /* Get exclusive access to that bucket */
206 ot_vector *torrents_list = mutex_bucket_lock( bucket );
207 ot_torrent *torrents = (ot_torrent*)(torrents_list->data);
208 size_t i;
209
210 /* For each torrent in this bucket.. */
211 for( i=0; i<torrents_list->size; ++i ) {
212 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
213 r = fullscrape_write_one( mode, compress_buffer, torrents[i].peer_list, &torrents[i].hash );
214 strm.next_in = (uint8_t*)compress_buffer;
215 strm.avail_in = r - compress_buffer;
216 zres = deflate( &strm, Z_NO_FLUSH );
217 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) )
218 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" );
219
220 /* Check if there still is enough buffer left */
221 while( !strm.avail_out ) {
222 /* Allocate a fresh output buffer at the end of our buffers list */
223 r = iovec_fix_increase_or_free( iovec_entries, iovector, strm.next_out, OT_SCRAPE_CHUNK_SIZE );
224 if( !r ) {
225 fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" );
226 deflateEnd(&strm);
227 return mutex_bucket_unlock( bucket, 0 );
228 }
229 strm.next_out = (uint8_t*)r;
230 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
203 zres = deflate( &strm, Z_NO_FLUSH ); 231 zres = deflate( &strm, Z_NO_FLUSH );
204 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) 232 if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) )
205 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); 233 fprintf( stderr, "deflate() failed while in fullscrape_make().\n" );
206 r = (char*)strm.next_out;
207 } 234 }
208#endif
209
210 /* Check if there still is enough buffer left */
211 while( r >= re )
212 if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_NO_FLUSH ) ) )
213 return mutex_bucket_unlock( bucket, 0 );
214
215 IF_COMPRESSION( r = compress_buffer; )
216 } 235 }
217 236
218 /* All torrents done: release lock on current bucket */ 237 /* All torrents done: release lock on current bucket */
@@ -223,27 +242,41 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
223 return; 242 return;
224 } 243 }
225 244
226 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) 245 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {
227 r += sprintf( r, "ee" ); 246 strm.next_in = (uint8_t*)"ee";
247 strm.avail_in = strlen("ee");
248 }
228 249
229#ifdef WANT_COMPRESSION_GZIP 250 if( deflate( &strm, Z_FINISH ) < Z_OK )
230 if( mode & TASK_FLAG_GZIP ) { 251 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" );
231 strm.next_in = (uint8_t*)compress_buffer; 252
232 strm.avail_in = r - compress_buffer; 253 if( !strm.avail_out ) {
254 unsigned int pending;
255 int bits;
256 deflatePending( &strm, &pending, &bits);
257 pending += ( bits ? 1 : 0 );
258
259 /* Allocate a fresh output buffer at the end of our buffers list */
260 r = iovec_fix_increase_or_free( iovec_entries, iovector, strm.next_out, pending );
261 if( !r ) {
262 fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" );
263 deflateEnd(&strm);
264 return mutex_bucket_unlock( bucket, 0 );
265 }
266 strm.next_out = (uint8_t*)r;
267 strm.avail_out = pending;
233 if( deflate( &strm, Z_FINISH ) < Z_OK ) 268 if( deflate( &strm, Z_FINISH ) < Z_OK )
234 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); 269 fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" );
235 r = (char*)strm.next_out;
236
237 while( r >= re )
238 if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_FINISH ) ) )
239 return mutex_bucket_unlock( bucket, 0 );
240 deflateEnd(&strm);
241 } 270 }
242#endif
243 271
244 /* Release unused memory in current output buffer */ 272 /* Release unused memory in current output buffer */
245 iovec_fixlast( iovec_entries, iovector, r ); 273 iovec_fixlast( iovec_entries, iovector, strm.next_out );
274
275 deflateEnd(&strm);
246} 276}
277/* WANT_COMPRESSION_GZIP */
247#endif 278#endif
248 279
280/* WANT_FULLSCRAPE */
281#endif
249const char *g_version_fullscrape_c = "$Source$: $Revision$\n"; 282const char *g_version_fullscrape_c = "$Source$: $Revision$\n";