From 4072f162b426d2e250d4b95d55e81b6908b8b509 Mon Sep 17 00:00:00 2001 From: erdgeist <> Date: Wed, 21 Nov 2007 01:55:15 +0000 Subject: Make sync generation multithreaded. --- ot_sync.c | 121 ++++++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 90 insertions(+), 31 deletions(-) (limited to 'ot_sync.c') diff --git a/ot_sync.c b/ot_sync.c index 6e95a98..5261ee6 100644 --- a/ot_sync.c +++ b/ot_sync.c @@ -6,6 +6,7 @@ #include #include #include +#include /* Libowfat */ #include "scan.h" @@ -17,6 +18,9 @@ #include "ot_sync.h" #ifdef WANT_TRACKER_SYNC + +#define OT_SYNC_CHUNK_SIZE (512*1024) + /* Import Changeset from an external authority format: d4:syncd[..]ee [..]: ( 20:01234567890abcdefghij16:XXXXYYYY )+ @@ -60,48 +64,103 @@ int add_changeset_to_tracker( ot_byte *data, size_t len ) { /* Proposed output format d4:syncd20:8*N:(xxxxyyyy)*Nee */ -size_t return_changeset_for_tracker( char **reply ) { - size_t allocated = 0, i, replysize; - ot_vector *torrents_list; +static void sync_make( int *iovec_entries, struct iovec **iovector ) { int bucket; - char *r; + char *r, *re; - /* Maybe there is time to clean_all_torrents(); */ + /* Setup return vector... */ + *iovec_entries = 0; + *iovector = NULL; + if( !( r = iovec_increase( iovec_entries, iovector, OT_SYNC_CHUNK_SIZE ) ) ) + return; - /* Determine space needed for whole changeset */ - for( bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket ) { - torrents_list = mutex_bucket_lock(bucket); - for( i=0; isize; ++i ) { - ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + i; - allocated += sizeof( ot_hash ) + sizeof(ot_peer) * torrent->peer_list->changeset.size + 13; - } - mutex_bucket_unlock(bucket); - } + /* ... and pointer to end of current output buffer. + This works as a low watermark */ + re = r + OT_SYNC_CHUNK_SIZE; - /* add "d4:syncd" and "ee" */ - allocated += 8 + 2; + memmove( r, "d4:syncd", 8 ); r += 8; - if( !( r = *reply = mmap( NULL, allocated, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0 ) ) ) - return 0; + /* For each bucket... */ + for( bucket=0; bucketsize; ++tor_offset ) { + /* Address torrents members */ + ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; + ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash; + const size_t byte_count = sizeof(ot_peer) * peer_list->changeset.size; + + /* If we reached our low watermark in buffer... */ + if( re - r <= (ssize_t)(/* strlen( "20:" ) == */ 3 + sizeof( ot_hash ) + /* strlen_max( "%zd" ) == */ 12 + byte_count ) ) { + + /* crop current output buffer to the amount really used */ + iovec_fixlast( iovec_entries, iovector, OT_SYNC_CHUNK_SIZE - ( re - r ) ); + + /* And allocate a fresh output buffer at the end of our buffers list */ + if( !( r = iovec_increase( iovec_entries, iovector, OT_SYNC_CHUNK_SIZE ) ) ) { + + /* If this fails: free buffers */ + iovec_free( iovec_entries, iovector ); + + /* Release lock on current bucket and return */ + mutex_bucket_unlock( bucket ); + return; + } + + /* Adjust new end of output buffer */ + re = r + OT_SYNC_CHUNK_SIZE; + } - memmove( r, "d4:syncd", 8 ); r += 8; - for( bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket ) { - torrents_list = mutex_bucket_lock(bucket); - for( i=0; isize; ++i ) { - ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + i; - const size_t byte_count = sizeof(ot_peer) * torrent->peer_list->changeset.size; *r++ = '2'; *r++ = '0'; *r++ = ':'; - memmove( r, torrent->hash, sizeof( ot_hash ) ); r += sizeof( ot_hash ); + memmove( r, hash, sizeof( ot_hash ) ); r += sizeof( ot_hash ); r += sprintf( r, "%zd:", byte_count ); - memmove( r, torrent->peer_list->changeset.data, byte_count ); r += byte_count; + memmove( r, peer_list->changeset.data, byte_count ); r += byte_count; } - mutex_bucket_unlock(bucket); + + /* All torrents done: release lock on currenct bucket */ + mutex_bucket_unlock( bucket ); } - *r++ = 'e'; *r++ = 'e'; - replysize = ( r - *reply ); - fix_mmapallocation( *reply, allocated, replysize ); + /* Close bencoded sync dictionary */ + *r++='e'; *r++='e'; - return replysize; + /* Release unused memory in current output buffer */ + iovec_fixlast( iovec_entries, iovector, OT_SYNC_CHUNK_SIZE - ( re - r ) ); } + +/* This is the entry point into this worker thread + It grabs tasks from mutex_tasklist and delivers results back +*/ +static void * sync_worker( void * args) { + int iovec_entries; + struct iovec *iovector; + + args = args; + + while( 1 ) { + ot_tasktype tasktype = TASK_SYNC_OUT; + ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); + sync_make( &iovec_entries, &iovector ); + if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) + iovec_free( &iovec_entries, &iovector ); + } + return NULL; +} + +static pthread_t thread_id; +void sync_init( ) { + pthread_create( &thread_id, NULL, sync_worker, NULL ); +} + +void sync_deinit( ) { + pthread_cancel( thread_id ); +} + +void sync_deliver( int64 socket ) { + mutex_workqueue_pushtask( socket, TASK_SYNC_OUT ); +} + #endif -- cgit v1.2.3