From bfc398182fb51543a7cf37033ef8d8fa18af5e7a Mon Sep 17 00:00:00 2001 From: Dirk Engling Date: Wed, 21 Apr 2021 14:41:59 +0200 Subject: Rework fullscrape worker, unifying non-gzip and gzip code was a bad idea --- ot_fullscrape.c | 283 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 158 insertions(+), 125 deletions(-) (limited to 'ot_fullscrape.c') diff --git a/ot_fullscrape.c b/ot_fullscrape.c index faea4b9..e7a1c19 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c @@ -35,16 +35,11 @@ /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ #define OT_SCRAPE_MAXENTRYLEN 256 -#ifdef WANT_COMPRESSION_GZIP -#define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK -#define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 ) , param1, param2, param3 -#else -#define IF_COMPRESSION( TASK ) -#define WANT_COMPRESSION_GZIP_PARAM( param1, param2, param3 ) -#endif - /* Forward declaration */ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); +#ifdef WANT_COMPRESSION_GZIP +static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); +#endif /* Converter function from memory to human readable hex strings XXX - Duplicated from ot_stats. Needs fix. */ @@ -59,14 +54,17 @@ static void * fullscrape_worker( void * args ) { (void) args; - while( 1 ) { + while( g_opentracker_running ) { ot_tasktype tasktype = TASK_FULLSCRAPE; ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); - fullscrape_make( &iovec_entries, &iovector, tasktype ); +#ifdef WANT_COMPRESSION_GZIP + if (tasktype & TASK_FLAG_GZIP) + fullscrape_make_gzip( &iovec_entries, &iovector, tasktype ); + else +#endif + fullscrape_make( &iovec_entries, &iovector, tasktype ); if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) iovec_free( &iovec_entries, &iovector ); - if( !g_opentracker_running ) - return NULL; } return NULL; } @@ -84,68 +82,57 @@ void fullscrape_deliver( int64 sock, ot_tasktype tasktype ) { mutex_workqueue_pushtask( sock, tasktype ); } -static int fullscrape_increase( int *iovec_entries, struct iovec **iovector, - char **r, char **re WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode, int zaction ) ) { - /* Allocate a fresh output buffer at the end of our buffers list */ - if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) { - - /* Deallocate gzip buffers */ - IF_COMPRESSION( deflateEnd(strm); ) - - /* Release lock on current bucket and return */ - return -1; - } - - /* Adjust new end of output buffer */ - *re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; - - /* When compressing, we have all the bytes in output buffer */ -#ifdef WANT_COMPRESSION_GZIP - if( mode & TASK_FLAG_GZIP ) { - int zres; - *re -= OT_SCRAPE_MAXENTRYLEN; - strm->next_out = (uint8_t*)*r; - strm->avail_out = OT_SCRAPE_CHUNK_SIZE; - zres = deflate( strm, zaction ); - if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) - fprintf( stderr, "deflate() failed while in fullscrape_increase(%d).\n", zaction ); - *r = (char*)strm->next_out; - } -#endif - - return 0; +static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_peerlist *peer_list, ot_hash *hash ) { + switch( mode & TASK_TASK_MASK ) { + case TASK_FULLSCRAPE: + default: + /* push hash as bencoded string */ + *r++='2'; *r++='0'; *r++=':'; + memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash); + /* push rest of the scrape string */ + r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); + + break; + case TASK_FULLSCRAPE_TPB_ASCII: + to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); + r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); + break; + case TASK_FULLSCRAPE_TPB_ASCII_PLUS: + to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); + r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count ); + break; + case TASK_FULLSCRAPE_TPB_BINARY: + memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash); + *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count ); + *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) ); + r+=8; + break; + case TASK_FULLSCRAPE_TPB_URLENCODED: + r += fmt_urlencoded( r, (char *)*hash, 20 ); + r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); + break; + case TASK_FULLSCRAPE_TRACKERSTATE: + to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); + r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count ); + break; + } + return r; } static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { int bucket; char *r, *re; -#ifdef WANT_COMPRESSION_GZIP - char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; - z_stream strm; -#endif /* Setup return vector... */ *iovec_entries = 0; *iovector = NULL; - if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) + r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ); + if( !r ) return; /* re points to low watermark */ re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; -#ifdef WANT_COMPRESSION_GZIP - if( mode & TASK_FLAG_GZIP ) { - re += OT_SCRAPE_MAXENTRYLEN; - byte_zero( &strm, sizeof(strm) ); - strm.next_in = (uint8_t*)compress_buffer; - strm.next_out = (uint8_t*)r; - strm.avail_out = OT_SCRAPE_CHUNK_SIZE; - if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK ) - fprintf( stderr, "not ok.\n" ); - r = compress_buffer; - } -#endif - if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) r += sprintf( r, "d5:filesd" ); @@ -153,66 +140,98 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas for( bucket=0; bucketdata); + size_t i; /* For each torrent in this bucket.. */ - for( tor_offset=0; tor_offsetsize; ++tor_offset ) { - /* Address torrents members */ - ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; - ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash; - - switch( mode & TASK_TASK_MASK ) { - case TASK_FULLSCRAPE: - default: - /* push hash as bencoded string */ - *r++='2'; *r++='0'; *r++=':'; - memcpy( r, hash, sizeof(ot_hash) ); r += sizeof(ot_hash); - /* push rest of the scrape string */ - r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); - - break; - case TASK_FULLSCRAPE_TPB_ASCII: - to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); - r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); - break; - case TASK_FULLSCRAPE_TPB_ASCII_PLUS: - to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); - r += sprintf( r, ":%zd:%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count, peer_list->down_count ); - break; - case TASK_FULLSCRAPE_TPB_BINARY: - memcpy( r, *hash, sizeof(ot_hash) ); r += sizeof(ot_hash); - *(uint32_t*)(r+0) = htonl( (uint32_t) peer_list->seed_count ); - *(uint32_t*)(r+4) = htonl( (uint32_t)( peer_list->peer_count-peer_list->seed_count) ); - r+=8; - break; - case TASK_FULLSCRAPE_TPB_URLENCODED: - r += fmt_urlencoded( r, (char *)*hash, 20 ); - r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); - break; - case TASK_FULLSCRAPE_TRACKERSTATE: - to_hex( r, *hash ); r+= 2 * sizeof(ot_hash); - r += sprintf( r, ":%zd:%zd\n", peer_list->base, peer_list->down_count ); - break; + for( i=0; isize; ++i ) { + r = fullscrape_write_one( mode, r, torrents[i].peer_list, &torrents[i].hash ); + + if( r > re) { + /* Allocate a fresh output buffer at the end of our buffers list */ + r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SCRAPE_CHUNK_SIZE ); + if( !r ) + return mutex_bucket_unlock( bucket, 0 ); + + /* re points to low watermark */ + re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN; } + } + + /* All torrents done: release lock on current bucket */ + mutex_bucket_unlock( bucket, 0 ); + + /* Parent thread died? */ + if( !g_opentracker_running ) + return; + } + + if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) + r += sprintf( r, "ee" ); + + /* Release unused memory in current output buffer */ + iovec_fixlast( iovec_entries, iovector, r ); +} #ifdef WANT_COMPRESSION_GZIP - if( mode & TASK_FLAG_GZIP ) { - int zres; - strm.next_in = (uint8_t*)compress_buffer; - strm.avail_in = r - compress_buffer; + +static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { + int bucket; + char *r; + int zres; + z_stream strm; + + /* Setup return vector... */ + *iovec_entries = 0; + *iovector = NULL; + r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ); + if( !r ) + return; + + byte_zero( &strm, sizeof(strm) ); + strm.next_out = (uint8_t*)r; + strm.avail_out = OT_SCRAPE_CHUNK_SIZE; + if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK ) + fprintf( stderr, "not ok.\n" ); + + if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { + strm.next_in = (uint8_t*)"d5:filesd"; + strm.avail_in = strlen("d5:filesd"); + zres = deflate( &strm, Z_NO_FLUSH ); + } + + /* For each bucket... */ + for( bucket=0; bucketdata); + size_t i; + + /* For each torrent in this bucket.. */ + for( i=0; isize; ++i ) { + char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; + r = fullscrape_write_one( mode, compress_buffer, torrents[i].peer_list, &torrents[i].hash ); + strm.next_in = (uint8_t*)compress_buffer; + strm.avail_in = r - compress_buffer; + zres = deflate( &strm, Z_NO_FLUSH ); + if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) + fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); + + /* Check if there still is enough buffer left */ + while( !strm.avail_out ) { + /* Allocate a fresh output buffer at the end of our buffers list */ + r = iovec_fix_increase_or_free( iovec_entries, iovector, strm.next_out, OT_SCRAPE_CHUNK_SIZE ); + if( !r ) { + fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" ); + deflateEnd(&strm); + return mutex_bucket_unlock( bucket, 0 ); + } + strm.next_out = (uint8_t*)r; + strm.avail_out = OT_SCRAPE_CHUNK_SIZE; zres = deflate( &strm, Z_NO_FLUSH ); if( ( zres < Z_OK ) && ( zres != Z_BUF_ERROR ) ) fprintf( stderr, "deflate() failed while in fullscrape_make().\n" ); - r = (char*)strm.next_out; } -#endif - - /* Check if there still is enough buffer left */ - while( r >= re ) - if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_NO_FLUSH ) ) ) - return mutex_bucket_unlock( bucket, 0 ); - - IF_COMPRESSION( r = compress_buffer; ) } /* All torrents done: release lock on current bucket */ @@ -223,27 +242,41 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas return; } - if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) - r += sprintf( r, "ee" ); + if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { + strm.next_in = (uint8_t*)"ee"; + strm.avail_in = strlen("ee"); + } -#ifdef WANT_COMPRESSION_GZIP - if( mode & TASK_FLAG_GZIP ) { - strm.next_in = (uint8_t*)compress_buffer; - strm.avail_in = r - compress_buffer; + if( deflate( &strm, Z_FINISH ) < Z_OK ) + fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); + + if( !strm.avail_out ) { + unsigned int pending; + int bits; + deflatePending( &strm, &pending, &bits); + pending += ( bits ? 1 : 0 ); + + /* Allocate a fresh output buffer at the end of our buffers list */ + r = iovec_fix_increase_or_free( iovec_entries, iovector, strm.next_out, pending ); + if( !r ) { + fprintf( stderr, "Problem with iovec_fix_increase_or_free\n" ); + deflateEnd(&strm); + return mutex_bucket_unlock( bucket, 0 ); + } + strm.next_out = (uint8_t*)r; + strm.avail_out = pending; if( deflate( &strm, Z_FINISH ) < Z_OK ) fprintf( stderr, "deflate() failed while in fullscrape_make()'s endgame.\n" ); - r = (char*)strm.next_out; - - while( r >= re ) - if( fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode, Z_FINISH ) ) ) - return mutex_bucket_unlock( bucket, 0 ); - deflateEnd(&strm); } -#endif /* Release unused memory in current output buffer */ - iovec_fixlast( iovec_entries, iovector, r ); + iovec_fixlast( iovec_entries, iovector, strm.next_out ); + + deflateEnd(&strm); } +/* WANT_COMPRESSION_GZIP */ #endif +/* WANT_FULLSCRAPE */ +#endif const char *g_version_fullscrape_c = "$Source$: $Revision$\n"; -- cgit v1.2.3