summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ot_mutex.c156
-rw-r--r--ot_mutex.h18
2 files changed, 174 insertions, 0 deletions
diff --git a/ot_mutex.c b/ot_mutex.c
index bb82f46..b39baaa 100644
--- a/ot_mutex.c
+++ b/ot_mutex.c
@@ -4,6 +4,8 @@
4/* System */ 4/* System */
5#include <pthread.h> 5#include <pthread.h>
6#include <stdio.h> 6#include <stdio.h>
7#include <stdlib.h>
8#include <sys/mman.h>
7 9
8/* Libowfat */ 10/* Libowfat */
9#include "byte.h" 11#include "byte.h"
@@ -15,6 +17,7 @@
15/* Our global all torrents list */ 17/* Our global all torrents list */
16static ot_vector all_torrents[OT_BUCKET_COUNT]; 18static ot_vector all_torrents[OT_BUCKET_COUNT];
17 19
20/* Bucket Magic */
18static int bucket_locklist[ OT_MAX_THREADS ]; 21static int bucket_locklist[ OT_MAX_THREADS ];
19static int bucket_locklist_count = 0; 22static int bucket_locklist_count = 0;
20static pthread_mutex_t bucket_mutex; 23static pthread_mutex_t bucket_mutex;
@@ -91,6 +94,159 @@ void mutex_bucket_unlock_by_hash( ot_hash *hash ) {
91 mutex_bucket_unlock( bucket ); 94 mutex_bucket_unlock( bucket );
92} 95}
93 96
97/* TaskQueue Magic */
98
99struct ot_task {
100 ot_taskid taskid;
101 ot_tasktype tasktype;
102 int64 socket;
103 int iovec_entries;
104 struct iovec *iovec;
105 struct ot_task *next;
106};
107
108static ot_taskid next_free_taskid = 1;
109static struct ot_task *tasklist = NULL;
110static pthread_mutex_t tasklist_mutex;
111static pthread_cond_t tasklist_being_filled;
112
113int mutex_workqueue_pushtask( int64 socket, ot_tasktype tasktype ) {
114 struct ot_task ** tmptask, * task;
115
116 /* Want exclusive access to tasklist */
117 pthread_mutex_lock( &tasklist_mutex );
118
119 task = malloc(sizeof( struct ot_task));
120 if( !task ) {
121 pthread_mutex_unlock( &tasklist_mutex );
122 return -1;
123 }
124
125 /* Skip to end of list */
126 tmptask = &tasklist;
127 while( *tmptask )
128 tmptask = &(*tmptask)->next;
129 *tmptask = task;
130
131 task->taskid = 0;
132 task->tasktype = tasktype;
133 task->socket = socket;
134 task->iovec_entries = 0;
135 task->iovec = NULL;
136 task->next = 0;
137
138 /* Inform waiting workers and release lock */
139 pthread_cond_broadcast( &tasklist_being_filled );
140 pthread_mutex_unlock( &tasklist_mutex );
141 return 0;
142}
143
144void mutex_workqueue_canceltask( int64 socket ) {
145 struct ot_task ** task;
146
147 /* Want exclusive access to tasklist */
148 pthread_mutex_lock( &tasklist_mutex );
149
150 task = &tasklist;
151 while( *task && ( (*task)->socket != socket ) )
152 *task = (*task)->next;
153
154 if( *task && ( (*task)->socket == socket ) ) {
155 struct iovec *iovec = (*task)->iovec;
156 struct ot_task *ptask = *task;
157 int i;
158
159 /* Free task's iovec */
160 for( i=0; i<(*task)->iovec_entries; ++i )
161 munmap( iovec[i].iov_base , iovec[i].iov_len );
162
163 *task = (*task)->next;
164 free( ptask );
165 }
166
167 /* Release lock */
168 pthread_mutex_unlock( &tasklist_mutex );
169}
170
171ot_taskid mutex_workqueue_poptask( ot_tasktype tasktype ) {
172 struct ot_task * task;
173 ot_taskid taskid = 0;
174
175 /* Want exclusive access to tasklist */
176 pthread_mutex_lock( &tasklist_mutex );
177
178 while( !taskid ) {
179 /* Skip to the first unassigned task this worker wants to do */
180 task = tasklist;
181 while( task && ( task->tasktype != tasktype ) && ( task->taskid ) )
182 task = task->next;
183
184 /* If we found an outstanding task, assign a taskid to it
185 and leave the loop */
186 if( task ) {
187 task->taskid = taskid = ++next_free_taskid;
188 break;
189 }
190
191 /* Wait until the next task is being fed */
192 pthread_cond_wait( &tasklist_being_filled, &tasklist_mutex );
193 }
194
195 /* Release lock */
196 pthread_mutex_unlock( &tasklist_mutex );
197
198 return taskid;
199}
200
201int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovec ) {
202 struct ot_task * task;
203 /* Want exclusive access to tasklist */
204 pthread_mutex_lock( &tasklist_mutex );
205
206 task = tasklist;
207 while( task && ( task->taskid != taskid ) )
208 task = task->next;
209
210 if( task ) {
211 task->iovec_entries = iovec_entries;
212 task->iovec = iovec;
213 task->tasktype = OT_TASKTYPE_DONE;
214 }
215
216 /* Release lock */
217 pthread_mutex_unlock( &tasklist_mutex );
218
219 /* Indicate whether the worker has to throw away results */
220 return task ? 0 : -1;
221}
222
223int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec ) {
224 struct ot_task ** task;
225 int64 socket = -1;
226
227 /* Want exclusive access to tasklist */
228 pthread_mutex_lock( &tasklist_mutex );
229
230 task = &tasklist;
231 while( *task && ( (*task)->tasktype != OT_TASKTYPE_DONE ) )
232 *task = (*task)->next;
233
234 if( *task && ( (*task)->tasktype == OT_TASKTYPE_DONE ) ) {
235 struct ot_task *ptask = *task;
236
237 *iovec_entries = (*task)->iovec_entries;
238 *iovec = (*task)->iovec;
239 socket = (*task)->socket;
240
241 *task = (*task)->next;
242 free( ptask );
243 }
244
245 /* Release lock */
246 pthread_mutex_unlock( &tasklist_mutex );
247 return socket;
248}
249
94void mutex_init( ) { 250void mutex_init( ) {
95 pthread_mutex_init(&bucket_mutex, NULL); 251 pthread_mutex_init(&bucket_mutex, NULL);
96 pthread_cond_init (&bucket_being_unlocked, NULL); 252 pthread_cond_init (&bucket_being_unlocked, NULL);
diff --git a/ot_mutex.h b/ot_mutex.h
index 2d30c69..31a16ef 100644
--- a/ot_mutex.h
+++ b/ot_mutex.h
@@ -4,6 +4,9 @@
4#ifndef __OT_MUTEX_H__ 4#ifndef __OT_MUTEX_H__
5#define __OT_MUTEX_H__ 5#define __OT_MUTEX_H__
6 6
7#include "ot_iovec.h"
8#include "io.h"
9
7void mutex_init( ); 10void mutex_init( );
8void mutex_deinit( ); 11void mutex_deinit( );
9 12
@@ -13,4 +16,19 @@ ot_vector *mutex_bucket_lock_by_hash( ot_hash *hash );
13void mutex_bucket_unlock( int bucket ); 16void mutex_bucket_unlock( int bucket );
14void mutex_bucket_unlock_by_hash( ot_hash *hash ); 17void mutex_bucket_unlock_by_hash( ot_hash *hash );
15 18
19typedef enum {
20 OT_TASKTYPE_FULLSCRAPE,
21 OT_TASKTYPE_SYNC,
22 OT_TASKTYPE_DMEM,
23
24 OT_TASKTYPE_DONE
25} ot_tasktype;
26typedef unsigned long ot_taskid;
27
28int mutex_workqueue_pushtask( int64 socket, ot_tasktype tasktype );
29void mutex_workqueue_canceltask( int64 socket );
30ot_taskid mutex_workqueue_poptask( ot_tasktype tasktype );
31int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector );
32int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector );
33
16#endif 34#endif