summaryrefslogtreecommitdiff
path: root/ot_mutex.c
diff options
context:
space:
mode:
authorDirk Engling <erdgeist@erdgeist.org>2024-04-13 00:47:29 +0200
committerDirk Engling <erdgeist@erdgeist.org>2024-04-13 00:47:29 +0200
commit1a70d9f9ef81ac1b5e843ac71f3538f7845e03ae (patch)
tree20a20077503c01dc024e88a6a8d82bf89faf22fd /ot_mutex.c
parent301faeb10c5994a6fd31adc5f0b4f8f2b5c23502 (diff)
First shot on chunked transfers
Diffstat (limited to 'ot_mutex.c')
-rw-r--r--ot_mutex.c54
1 files changed, 49 insertions, 5 deletions
diff --git a/ot_mutex.c b/ot_mutex.c
index 497b1af..b61b915 100644
--- a/ot_mutex.c
+++ b/ot_mutex.c
@@ -17,6 +17,7 @@
17 17
18/* Opentracker */ 18/* Opentracker */
19#include "trackerlogic.h" 19#include "trackerlogic.h"
20#include "ot_iovec.h"
20#include "ot_mutex.h" 21#include "ot_mutex.h"
21#include "ot_stats.h" 22#include "ot_stats.h"
22 23
@@ -194,23 +195,66 @@ int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iove
194 return task ? 0 : -1; 195 return task ? 0 : -1;
195} 196}
196 197
197int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec ) { 198int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) {
199 struct ot_task * task;
200 const char byte = 'o';
201
202 /* Want exclusive access to tasklist */
203 pthread_mutex_lock( &tasklist_mutex );
204
205 for (task = tasklist; task; task = task->next)
206 if (task->taskid == taskid) {
207 if( iovec ) {
208fprintf(stderr, "mutex_workqueue_pushchunked pushing on taskid %d\n", taskid);
209 if (!iovec_append(&task->iovec_entries, &task->iovec, iovec) )
210 return -1;
211 task->tasktype = TASK_DONE_PARTIAL;
212 } else {
213fprintf(stderr, "mutex_workqueue_pushchunked finished taskid %d\n", taskid);
214 task->tasktype = TASK_DONE;
215 }
216 break;
217 }
218
219 /* Release lock */
220 pthread_mutex_unlock( &tasklist_mutex );
221
222 io_trywrite( g_self_pipe[1], &byte, 1 );
223if(!task)
224fprintf(stderr, "mutex_workqueue_pushchunked taskid %d not found\n", taskid);
225
226 /* Indicate whether the worker has to throw away results */
227 return task ? 0 : -1;
228}
229
230
231int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec, int *is_partial ) {
198 struct ot_task ** task; 232 struct ot_task ** task;
199 int64 sock = -1; 233 int64 sock = -1;
200 234
235 *is_partial = 0;
236
201 /* Want exclusive access to tasklist */ 237 /* Want exclusive access to tasklist */
202 pthread_mutex_lock( &tasklist_mutex ); 238 pthread_mutex_lock( &tasklist_mutex );
203 239
204 for (task = &tasklist; *task; task = &((*task)->next)) 240 for (task = &tasklist; *task; task = &((*task)->next))
205 if ((*task)->tasktype == TASK_DONE) { 241 if (((*task)->tasktype & TASK_CLASS_MASK ) == TASK_DONE) {
206 struct ot_task *ptask = *task; 242 struct ot_task *ptask = *task;
207 243fprintf(stderr, "Got task %d type %d with %d entries\n", (*task)->taskid, (*task)->tasktype, ptask->iovec_entries);
208 *iovec_entries = ptask->iovec_entries; 244 *iovec_entries = ptask->iovec_entries;
209 *iovec = ptask->iovec; 245 *iovec = ptask->iovec;
210 sock = ptask->sock; 246 sock = ptask->sock;
211 247
212 *task = ptask->next; 248 if ((*task)->tasktype == TASK_DONE) {
213 free( ptask ); 249 *task = ptask->next;
250 free( ptask );
251 } else {
252 ptask->iovec_entries = 0;
253 ptask->iovec = NULL;
254 *is_partial = 1;
255 /* Prevent task from showing up immediately again unless new data was added */
256 (*task)->tasktype = TASK_FULLSCRAPE;
257 }
214 break; 258 break;
215 } 259 }
216 260