From 1a70d9f9ef81ac1b5e843ac71f3538f7845e03ae Mon Sep 17 00:00:00 2001 From: Dirk Engling Date: Sat, 13 Apr 2024 00:47:29 +0200 Subject: First shot on chunked transfers --- ot_mutex.c | 54 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 5 deletions(-) (limited to 'ot_mutex.c') diff --git a/ot_mutex.c b/ot_mutex.c index 497b1af..b61b915 100644 --- a/ot_mutex.c +++ b/ot_mutex.c @@ -17,6 +17,7 @@ /* Opentracker */ #include "trackerlogic.h" +#include "ot_iovec.h" #include "ot_mutex.h" #include "ot_stats.h" @@ -194,23 +195,66 @@ int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iove return task ? 0 : -1; } -int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec ) { +int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) { + struct ot_task * task; + const char byte = 'o'; + + /* Want exclusive access to tasklist */ + pthread_mutex_lock( &tasklist_mutex ); + + for (task = tasklist; task; task = task->next) + if (task->taskid == taskid) { + if( iovec ) { +fprintf(stderr, "mutex_workqueue_pushchunked pushing on taskid %d\n", taskid); + if (!iovec_append(&task->iovec_entries, &task->iovec, iovec) ) + return -1; + task->tasktype = TASK_DONE_PARTIAL; + } else { +fprintf(stderr, "mutex_workqueue_pushchunked finished taskid %d\n", taskid); + task->tasktype = TASK_DONE; + } + break; + } + + /* Release lock */ + pthread_mutex_unlock( &tasklist_mutex ); + + io_trywrite( g_self_pipe[1], &byte, 1 ); +if(!task) +fprintf(stderr, "mutex_workqueue_pushchunked taskid %d not found\n", taskid); + + /* Indicate whether the worker has to throw away results */ + return task ? 0 : -1; +} + + +int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec, int *is_partial ) { struct ot_task ** task; int64 sock = -1; + *is_partial = 0; + /* Want exclusive access to tasklist */ pthread_mutex_lock( &tasklist_mutex ); for (task = &tasklist; *task; task = &((*task)->next)) - if ((*task)->tasktype == TASK_DONE) { + if (((*task)->tasktype & TASK_CLASS_MASK ) == TASK_DONE) { struct ot_task *ptask = *task; - +fprintf(stderr, "Got task %d type %d with %d entries\n", (*task)->taskid, (*task)->tasktype, ptask->iovec_entries); *iovec_entries = ptask->iovec_entries; *iovec = ptask->iovec; sock = ptask->sock; - *task = ptask->next; - free( ptask ); + if ((*task)->tasktype == TASK_DONE) { + *task = ptask->next; + free( ptask ); + } else { + ptask->iovec_entries = 0; + ptask->iovec = NULL; + *is_partial = 1; + /* Prevent task from showing up immediately again unless new data was added */ + (*task)->tasktype = TASK_FULLSCRAPE; + } break; } -- cgit v1.2.3