From 6458a37d82aef8e910b2aed4cf25333e211482ae Mon Sep 17 00:00:00 2001 From: erdgeist <> Date: Fri, 16 Nov 2007 00:23:42 +0000 Subject: introducing multithreaded full scrape creation. --- opentracker.c | 40 +++++++++++++++++++++++++--------------- ot_fullscrape.c | 41 +++++++++++++++++++++++++++++++++++------ ot_fullscrape.h | 5 +++-- 3 files changed, 63 insertions(+), 23 deletions(-) diff --git a/opentracker.c b/opentracker.c index 501213d..06be4fa 100644 --- a/opentracker.c +++ b/opentracker.c @@ -193,15 +193,29 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec size_t header_size, size = iovec_length( &iovec_entries, &iovector ); tai6464 t; + /* No cookie? Bad socket. Leave. */ if( !h ) { iovec_free( &iovec_entries, &iovector ); - return; + HTTPERROR_500; } + + /* If this socket collected request in a buffer, + free it now */ if( h->flag & STRUCT_HTTP_FLAG_ARRAY_USED ) { h->flag &= ~STRUCT_HTTP_FLAG_ARRAY_USED; array_reset( &h->request ); } + /* If we came here, wait for the answer is over */ + h->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK; + + /* Our answers never are 0 bytes. Return an error. */ + if( !iovec_entries || !iovector[0].iov_len ) { + iovec_free( &iovec_entries, &iovector ); + HTTPERROR_500; + } + + /* Prepare space for http header */ header = malloc( SUCCESS_HTTP_HEADER_LENGTH ); if( !header ) { iovec_free( &iovec_entries, &iovector ); @@ -212,7 +226,7 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec iob_reset( &h->batch ); iob_addbuf_free( &h->batch, header, header_size ); - + /* Will move to ot_iovec.c */ for( i=0; ibatch, iovector[i].iov_base, iovector[i].iov_len ); @@ -390,19 +404,15 @@ LOG_TO_STDERR( "sync: %d.%d.%d.%d\n", h->ip[0], h->ip[1], h->ip[2], h->ip[3] ); /* Full scrape... you might want to limit that */ if( !byte_diff( data, 12, "scrape HTTP/" ) ) { - int iovec_entries = 0; - struct iovec * iovector = NULL; - reply_size = return_fullscrape_for_tracker( &iovec_entries, &iovector ); - LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE\n", (unsigned int)(g_now - ot_start_time), h->ip[0], h->ip[1], h->ip[2], h->ip[3] ); #ifdef _DEBUG_HTTPERROR write( 2, debug_request, l ); #endif - if( !reply_size ) HTTPERROR_500; - - /* Stat keeping */ - stats_issue_event( EVENT_FULLSCRAPE, 1, reply_size); - return sendiovecdata( s, iovec_entries, iovector ); + /* Pass this task to the worker thread */ + h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; + fullscrape_deliver( s ); + io_dontwantread( s ); + return; } SCRAPE_WORKAROUND: @@ -714,9 +724,8 @@ static void handle_timeouted( void ) { static void server_mainloop( ) { time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL; -/* Later we will poll for finished tasks struct iovec *iovector; - int iovec_entries;*/ + int iovec_entries; for( ; ; ) { int64 i; @@ -733,9 +742,8 @@ static void server_mainloop( ) { handle_read( i ); } -/* Later we will poll for finished tasks while( ( i = mutex_workqueue_popresult( &iovec_entries, &iovector ) ) != -1 ) - sendiovecdata( i, iovec_entries, iovector ); */ + sendiovecdata( i, iovec_entries, iovector ); while( ( i = io_canwrite( ) ) != -1 ) handle_write( i ); @@ -835,6 +843,8 @@ int main( int argc, char **argv ) { if( trackerlogic_init( serverdir ) == -1 ) panic( "Logic not started" ); + fullscrape_init( ); + g_now = ot_start_time = time( NULL ); alarm(5); diff --git a/ot_fullscrape.c b/ot_fullscrape.c index 3c9540d..58e525f 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c @@ -5,6 +5,7 @@ #include #include #include +#include /* Libowfat */ @@ -23,14 +24,45 @@ /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ #define OT_FULLSCRAPE_MAXENTRYLEN 100 -size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovector ) { +/* Forward declaration */ +static void fullscrape_make( int *iovec_entries, struct iovec **iovector ); + +/* This is the entry point into this worker thread + It grabs tasks from mutex_tasklist and delivers results back +*/ +static void * fullscrape_worker( void * args) { + int iovec_entries; + struct iovec *iovector; + + args = args; + + while( 1 ) { + ot_taskid taskid = mutex_workqueue_poptask( OT_TASKTYPE_FULLSCRAPE ); + fullscrape_make( &iovec_entries, &iovector ); + if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) + iovec_free( &iovec_entries, &iovector ); + } + return NULL; +} + +void fullscrape_init( ) { + pthread_t thread_id; + pthread_create( &thread_id, NULL, fullscrape_worker, NULL ); +} + +void fullscrape_deliver( int64 socket ) { + mutex_workqueue_pushtask( socket, OT_TASKTYPE_FULLSCRAPE ); +} + +static void fullscrape_make( int *iovec_entries, struct iovec **iovector ) { int bucket; char *r, *re; /* Setup return vector... */ *iovec_entries = 0; + *iovector = NULL; if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) - return 0; + return; /* ... and pointer to end of current output buffer. This works as a low watermark */ @@ -76,7 +108,7 @@ size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovecto /* Release lock on current bucket and return */ mutex_bucket_unlock( bucket ); - return 0; + return; } /* Adjust new end of output buffer */ @@ -93,7 +125,4 @@ size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovecto /* Release unused memory in current output buffer */ iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) ); - - /* Return answer size */ - return iovec_length( iovec_entries, iovector ); } diff --git a/ot_fullscrape.h b/ot_fullscrape.h index a33d066..9ed4376 100644 --- a/ot_fullscrape.h +++ b/ot_fullscrape.h @@ -4,8 +4,9 @@ #ifndef __OT_FULLSCRAPE_H__ #define __OT_FULLSCRAPE_H__ -#include +#include -size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovector ); +void fullscrape_init( ); +void fullscrape_deliver( int64 socket ); #endif -- cgit v1.2.3