summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorerdgeist <>2007-11-16 00:23:42 +0000
committererdgeist <>2007-11-16 00:23:42 +0000
commit6458a37d82aef8e910b2aed4cf25333e211482ae (patch)
tree619db62da7168b8979ec61913128fcbbefa0e128
parentd3963803caab5b1cb30bb58b271dede1b731bbd8 (diff)
introducing multithreaded full scrape creation.
-rw-r--r--opentracker.c40
-rw-r--r--ot_fullscrape.c41
-rw-r--r--ot_fullscrape.h5
3 files changed, 63 insertions, 23 deletions
diff --git a/opentracker.c b/opentracker.c
index 501213d..06be4fa 100644
--- a/opentracker.c
+++ b/opentracker.c
@@ -193,15 +193,29 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec
193 size_t header_size, size = iovec_length( &iovec_entries, &iovector ); 193 size_t header_size, size = iovec_length( &iovec_entries, &iovector );
194 tai6464 t; 194 tai6464 t;
195 195
196 /* No cookie? Bad socket. Leave. */
196 if( !h ) { 197 if( !h ) {
197 iovec_free( &iovec_entries, &iovector ); 198 iovec_free( &iovec_entries, &iovector );
198 return; 199 HTTPERROR_500;
199 } 200 }
201
202 /* If this socket collected request in a buffer,
203 free it now */
200 if( h->flag & STRUCT_HTTP_FLAG_ARRAY_USED ) { 204 if( h->flag & STRUCT_HTTP_FLAG_ARRAY_USED ) {
201 h->flag &= ~STRUCT_HTTP_FLAG_ARRAY_USED; 205 h->flag &= ~STRUCT_HTTP_FLAG_ARRAY_USED;
202 array_reset( &h->request ); 206 array_reset( &h->request );
203 } 207 }
204 208
209 /* If we came here, wait for the answer is over */
210 h->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK;
211
212 /* Our answers never are 0 bytes. Return an error. */
213 if( !iovec_entries || !iovector[0].iov_len ) {
214 iovec_free( &iovec_entries, &iovector );
215 HTTPERROR_500;
216 }
217
218 /* Prepare space for http header */
205 header = malloc( SUCCESS_HTTP_HEADER_LENGTH ); 219 header = malloc( SUCCESS_HTTP_HEADER_LENGTH );
206 if( !header ) { 220 if( !header ) {
207 iovec_free( &iovec_entries, &iovector ); 221 iovec_free( &iovec_entries, &iovector );
@@ -212,7 +226,7 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec
212 226
213 iob_reset( &h->batch ); 227 iob_reset( &h->batch );
214 iob_addbuf_free( &h->batch, header, header_size ); 228 iob_addbuf_free( &h->batch, header, header_size );
215 229
216 /* Will move to ot_iovec.c */ 230 /* Will move to ot_iovec.c */
217 for( i=0; i<iovec_entries; ++i ) 231 for( i=0; i<iovec_entries; ++i )
218 iob_addbuf_munmap( &h->batch, iovector[i].iov_base, iovector[i].iov_len ); 232 iob_addbuf_munmap( &h->batch, iovector[i].iov_base, iovector[i].iov_len );
@@ -390,19 +404,15 @@ LOG_TO_STDERR( "sync: %d.%d.%d.%d\n", h->ip[0], h->ip[1], h->ip[2], h->ip[3] );
390 404
391 /* Full scrape... you might want to limit that */ 405 /* Full scrape... you might want to limit that */
392 if( !byte_diff( data, 12, "scrape HTTP/" ) ) { 406 if( !byte_diff( data, 12, "scrape HTTP/" ) ) {
393 int iovec_entries = 0;
394 struct iovec * iovector = NULL;
395 reply_size = return_fullscrape_for_tracker( &iovec_entries, &iovector );
396
397LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE\n", (unsigned int)(g_now - ot_start_time), h->ip[0], h->ip[1], h->ip[2], h->ip[3] ); 407LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE\n", (unsigned int)(g_now - ot_start_time), h->ip[0], h->ip[1], h->ip[2], h->ip[3] );
398#ifdef _DEBUG_HTTPERROR 408#ifdef _DEBUG_HTTPERROR
399write( 2, debug_request, l ); 409write( 2, debug_request, l );
400#endif 410#endif
401 if( !reply_size ) HTTPERROR_500; 411 /* Pass this task to the worker thread */
402 412 h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK;
403 /* Stat keeping */ 413 fullscrape_deliver( s );
404 stats_issue_event( EVENT_FULLSCRAPE, 1, reply_size); 414 io_dontwantread( s );
405 return sendiovecdata( s, iovec_entries, iovector ); 415 return;
406 } 416 }
407 417
408SCRAPE_WORKAROUND: 418SCRAPE_WORKAROUND:
@@ -714,9 +724,8 @@ static void handle_timeouted( void ) {
714 724
715static void server_mainloop( ) { 725static void server_mainloop( ) {
716 time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL; 726 time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
717/* Later we will poll for finished tasks
718 struct iovec *iovector; 727 struct iovec *iovector;
719 int iovec_entries;*/ 728 int iovec_entries;
720 729
721 for( ; ; ) { 730 for( ; ; ) {
722 int64 i; 731 int64 i;
@@ -733,9 +742,8 @@ static void server_mainloop( ) {
733 handle_read( i ); 742 handle_read( i );
734 } 743 }
735 744
736/* Later we will poll for finished tasks
737 while( ( i = mutex_workqueue_popresult( &iovec_entries, &iovector ) ) != -1 ) 745 while( ( i = mutex_workqueue_popresult( &iovec_entries, &iovector ) ) != -1 )
738 sendiovecdata( i, iovec_entries, iovector ); */ 746 sendiovecdata( i, iovec_entries, iovector );
739 747
740 while( ( i = io_canwrite( ) ) != -1 ) 748 while( ( i = io_canwrite( ) ) != -1 )
741 handle_write( i ); 749 handle_write( i );
@@ -835,6 +843,8 @@ int main( int argc, char **argv ) {
835 if( trackerlogic_init( serverdir ) == -1 ) 843 if( trackerlogic_init( serverdir ) == -1 )
836 panic( "Logic not started" ); 844 panic( "Logic not started" );
837 845
846 fullscrape_init( );
847
838 g_now = ot_start_time = time( NULL ); 848 g_now = ot_start_time = time( NULL );
839 alarm(5); 849 alarm(5);
840 850
diff --git a/ot_fullscrape.c b/ot_fullscrape.c
index 3c9540d..58e525f 100644
--- a/ot_fullscrape.c
+++ b/ot_fullscrape.c
@@ -5,6 +5,7 @@
5#include <sys/uio.h> 5#include <sys/uio.h>
6#include <stdio.h> 6#include <stdio.h>
7#include <string.h> 7#include <string.h>
8#include <pthread.h>
8 9
9/* Libowfat */ 10/* Libowfat */
10 11
@@ -23,14 +24,45 @@
23/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ 24/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
24#define OT_FULLSCRAPE_MAXENTRYLEN 100 25#define OT_FULLSCRAPE_MAXENTRYLEN 100
25 26
26size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovector ) { 27/* Forward declaration */
28static void fullscrape_make( int *iovec_entries, struct iovec **iovector );
29
30/* This is the entry point into this worker thread
31 It grabs tasks from mutex_tasklist and delivers results back
32*/
33static void * fullscrape_worker( void * args) {
34 int iovec_entries;
35 struct iovec *iovector;
36
37 args = args;
38
39 while( 1 ) {
40 ot_taskid taskid = mutex_workqueue_poptask( OT_TASKTYPE_FULLSCRAPE );
41 fullscrape_make( &iovec_entries, &iovector );
42 if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) )
43 iovec_free( &iovec_entries, &iovector );
44 }
45 return NULL;
46}
47
48void fullscrape_init( ) {
49 pthread_t thread_id;
50 pthread_create( &thread_id, NULL, fullscrape_worker, NULL );
51}
52
53void fullscrape_deliver( int64 socket ) {
54 mutex_workqueue_pushtask( socket, OT_TASKTYPE_FULLSCRAPE );
55}
56
57static void fullscrape_make( int *iovec_entries, struct iovec **iovector ) {
27 int bucket; 58 int bucket;
28 char *r, *re; 59 char *r, *re;
29 60
30 /* Setup return vector... */ 61 /* Setup return vector... */
31 *iovec_entries = 0; 62 *iovec_entries = 0;
63 *iovector = NULL;
32 if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) 64 if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) )
33 return 0; 65 return;
34 66
35 /* ... and pointer to end of current output buffer. 67 /* ... and pointer to end of current output buffer.
36 This works as a low watermark */ 68 This works as a low watermark */
@@ -76,7 +108,7 @@ size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovecto
76 108
77 /* Release lock on current bucket and return */ 109 /* Release lock on current bucket and return */
78 mutex_bucket_unlock( bucket ); 110 mutex_bucket_unlock( bucket );
79 return 0; 111 return;
80 } 112 }
81 113
82 /* Adjust new end of output buffer */ 114 /* Adjust new end of output buffer */
@@ -93,7 +125,4 @@ size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovecto
93 125
94 /* Release unused memory in current output buffer */ 126 /* Release unused memory in current output buffer */
95 iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) ); 127 iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) );
96
97 /* Return answer size */
98 return iovec_length( iovec_entries, iovector );
99} 128}
diff --git a/ot_fullscrape.h b/ot_fullscrape.h
index a33d066..9ed4376 100644
--- a/ot_fullscrape.h
+++ b/ot_fullscrape.h
@@ -4,8 +4,9 @@
4#ifndef __OT_FULLSCRAPE_H__ 4#ifndef __OT_FULLSCRAPE_H__
5#define __OT_FULLSCRAPE_H__ 5#define __OT_FULLSCRAPE_H__
6 6
7#include <sys/uio.h> 7#include <io.h>
8 8
9size_t return_fullscrape_for_tracker( int *iovec_entries, struct iovec **iovector ); 9void fullscrape_init( );
10void fullscrape_deliver( int64 socket );
10 11
11#endif 12#endif