summaryrefslogtreecommitdiff
path: root/ot_mutex.c
diff options
context:
space:
mode:
Diffstat (limited to 'ot_mutex.c')
-rw-r--r--ot_mutex.c165
1 files changed, 80 insertions, 85 deletions
diff --git a/ot_mutex.c b/ot_mutex.c
index 174c4ca..1aa2783 100644
--- a/ot_mutex.c
+++ b/ot_mutex.c
@@ -16,43 +16,39 @@
16#include "uint32.h" 16#include "uint32.h"
17 17
18/* Opentracker */ 18/* Opentracker */
19#include "trackerlogic.h"
20#include "ot_iovec.h" 19#include "ot_iovec.h"
21#include "ot_mutex.h" 20#include "ot_mutex.h"
22#include "ot_stats.h" 21#include "ot_stats.h"
22#include "trackerlogic.h"
23 23
24/* #define MTX_DBG( STRING ) fprintf( stderr, STRING ) */ 24/* #define MTX_DBG( STRING ) fprintf( stderr, STRING ) */
25#define MTX_DBG( STRING ) 25#define MTX_DBG(STRING)
26 26
27/* Our global all torrents list */ 27/* Our global all torrents list */
28static ot_vector all_torrents[OT_BUCKET_COUNT]; 28static ot_vector all_torrents[OT_BUCKET_COUNT];
29static pthread_mutex_t bucket_mutex[OT_BUCKET_COUNT]; 29static pthread_mutex_t bucket_mutex[OT_BUCKET_COUNT];
30static size_t g_torrent_count; 30static size_t g_torrent_count;
31 31
32/* Self pipe from opentracker.c */ 32/* Self pipe from opentracker.c */
33extern int g_self_pipe[2]; 33extern int g_self_pipe[2];
34 34
35ot_vector *mutex_bucket_lock( int bucket ) { 35ot_vector *mutex_bucket_lock(int bucket) {
36 pthread_mutex_lock(bucket_mutex + bucket ); 36 pthread_mutex_lock(bucket_mutex + bucket);
37 return all_torrents + bucket; 37 return all_torrents + bucket;
38} 38}
39 39
40ot_vector *mutex_bucket_lock_by_hash( ot_hash const hash ) { 40ot_vector *mutex_bucket_lock_by_hash(ot_hash const hash) { return mutex_bucket_lock(uint32_read_big((const char *)hash) >> OT_BUCKET_COUNT_SHIFT); }
41 return mutex_bucket_lock( uint32_read_big( (const char*)hash ) >> OT_BUCKET_COUNT_SHIFT );
42}
43 41
44void mutex_bucket_unlock( int bucket, int delta_torrentcount ) { 42void mutex_bucket_unlock(int bucket, int delta_torrentcount) {
45 pthread_mutex_unlock(bucket_mutex + bucket); 43 pthread_mutex_unlock(bucket_mutex + bucket);
46 g_torrent_count += delta_torrentcount; 44 g_torrent_count += delta_torrentcount;
47} 45}
48 46
49void mutex_bucket_unlock_by_hash( ot_hash const hash, int delta_torrentcount ) { 47void mutex_bucket_unlock_by_hash(ot_hash const hash, int delta_torrentcount) {
50 mutex_bucket_unlock( uint32_read_big( (char*)hash ) >> OT_BUCKET_COUNT_SHIFT, delta_torrentcount ); 48 mutex_bucket_unlock(uint32_read_big((char *)hash) >> OT_BUCKET_COUNT_SHIFT, delta_torrentcount);
51} 49}
52 50
53size_t mutex_get_torrent_count( ) { 51size_t mutex_get_torrent_count() { return g_torrent_count; }
54 return g_torrent_count;
55}
56 52
57/* TaskQueue Magic */ 53/* TaskQueue Magic */
58 54
@@ -65,16 +61,16 @@ struct ot_task {
65 struct ot_task *next; 61 struct ot_task *next;
66}; 62};
67 63
68static ot_taskid next_free_taskid = 1; 64static ot_taskid next_free_taskid = 1;
69static struct ot_task *tasklist; 65static struct ot_task *tasklist;
70static pthread_mutex_t tasklist_mutex; 66static pthread_mutex_t tasklist_mutex;
71static pthread_cond_t tasklist_being_filled; 67static pthread_cond_t tasklist_being_filled;
72 68
73int mutex_workqueue_pushtask( int64 sock, ot_tasktype tasktype ) { 69int mutex_workqueue_pushtask(int64 sock, ot_tasktype tasktype) {
74 struct ot_task ** tmptask, * task; 70 struct ot_task **tmptask, *task;
75 71
76 task = malloc(sizeof( struct ot_task)); 72 task = malloc(sizeof(struct ot_task));
77 if( !task ) 73 if (!task)
78 return -1; 74 return -1;
79 75
80 task->taskid = 0; 76 task->taskid = 0;
@@ -85,98 +81,98 @@ int mutex_workqueue_pushtask( int64 sock, ot_tasktype tasktype ) {
85 task->next = 0; 81 task->next = 0;
86 82
87 /* Want exclusive access to tasklist */ 83 /* Want exclusive access to tasklist */
88 pthread_mutex_lock( &tasklist_mutex ); 84 pthread_mutex_lock(&tasklist_mutex);
89 85
90 /* Skip to end of list */ 86 /* Skip to end of list */
91 tmptask = &tasklist; 87 tmptask = &tasklist;
92 while( *tmptask ) 88 while (*tmptask)
93 tmptask = &(*tmptask)->next; 89 tmptask = &(*tmptask)->next;
94 *tmptask = task; 90 *tmptask = task;
95 91
96 /* Inform waiting workers and release lock */ 92 /* Inform waiting workers and release lock */
97 pthread_cond_broadcast( &tasklist_being_filled ); 93 pthread_cond_broadcast(&tasklist_being_filled);
98 pthread_mutex_unlock( &tasklist_mutex ); 94 pthread_mutex_unlock(&tasklist_mutex);
99 return 0; 95 return 0;
100} 96}
101 97
102void mutex_workqueue_canceltask( int64 sock ) { 98void mutex_workqueue_canceltask(int64 sock) {
103 struct ot_task ** task; 99 struct ot_task **task;
104 100
105 /* Want exclusive access to tasklist */ 101 /* Want exclusive access to tasklist */
106 pthread_mutex_lock( &tasklist_mutex ); 102 pthread_mutex_lock(&tasklist_mutex);
107 103
108 for (task = &tasklist; *task; task = &((*task)->next)) 104 for (task = &tasklist; *task; task = &((*task)->next))
109 if ((*task)->sock == sock) { 105 if ((*task)->sock == sock) {
110 struct iovec *iovec = (*task)->iovec; 106 struct iovec *iovec = (*task)->iovec;
111 struct ot_task *ptask = *task; 107 struct ot_task *ptask = *task;
112 int i; 108 int i;
113 109
114 /* Free task's iovec */ 110 /* Free task's iovec */
115 for( i=0; i<(*task)->iovec_entries; ++i ) 111 for (i = 0; i < (*task)->iovec_entries; ++i)
116 free( iovec[i].iov_base ); 112 free(iovec[i].iov_base);
117 113
118 *task = (*task)->next; 114 *task = (*task)->next;
119 free( ptask ); 115 free(ptask);
120 break; 116 break;
121 } 117 }
122 118
123 /* Release lock */ 119 /* Release lock */
124 pthread_mutex_unlock( &tasklist_mutex ); 120 pthread_mutex_unlock(&tasklist_mutex);
125} 121}
126 122
127ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ) { 123ot_taskid mutex_workqueue_poptask(ot_tasktype *tasktype) {
128 struct ot_task * task; 124 struct ot_task *task;
129 ot_taskid taskid = 0; 125 ot_taskid taskid = 0;
130 126
131 /* Want exclusive access to tasklist */ 127 /* Want exclusive access to tasklist */
132 pthread_mutex_lock( &tasklist_mutex ); 128 pthread_mutex_lock(&tasklist_mutex);
133 129
134 while( !taskid ) { 130 while (!taskid) {
135 /* Skip to the first unassigned task this worker wants to do */ 131 /* Skip to the first unassigned task this worker wants to do */
136 for (task = tasklist; task; task = task->next) 132 for (task = tasklist; task; task = task->next)
137 if (!task->taskid && ( TASK_CLASS_MASK & task->tasktype ) == *tasktype) { 133 if (!task->taskid && (TASK_CLASS_MASK & task->tasktype) == *tasktype) {
138 /* If we found an outstanding task, assign a taskid to it 134 /* If we found an outstanding task, assign a taskid to it
139 and leave the loop */ 135 and leave the loop */
140 task->taskid = taskid = ++next_free_taskid; 136 task->taskid = taskid = ++next_free_taskid;
141 *tasktype = task->tasktype; 137 *tasktype = task->tasktype;
142 break; 138 break;
143 } 139 }
144 140
145 /* Wait until the next task is being fed */ 141 /* Wait until the next task is being fed */
146 if (!taskid) 142 if (!taskid)
147 pthread_cond_wait( &tasklist_being_filled, &tasklist_mutex ); 143 pthread_cond_wait(&tasklist_being_filled, &tasklist_mutex);
148 } 144 }
149 145
150 /* Release lock */ 146 /* Release lock */
151 pthread_mutex_unlock( &tasklist_mutex ); 147 pthread_mutex_unlock(&tasklist_mutex);
152 148
153 return taskid; 149 return taskid;
154} 150}
155 151
156void mutex_workqueue_pushsuccess( ot_taskid taskid ) { 152void mutex_workqueue_pushsuccess(ot_taskid taskid) {
157 struct ot_task ** task; 153 struct ot_task **task;
158 154
159 /* Want exclusive access to tasklist */ 155 /* Want exclusive access to tasklist */
160 pthread_mutex_lock( &tasklist_mutex ); 156 pthread_mutex_lock(&tasklist_mutex);
161 157
162 for (task = &tasklist; *task; task = &((*task)->next)) 158 for (task = &tasklist; *task; task = &((*task)->next))
163 if ((*task)->taskid == taskid) { 159 if ((*task)->taskid == taskid) {
164 struct ot_task *ptask = *task; 160 struct ot_task *ptask = *task;
165 *task = (*task)->next; 161 *task = (*task)->next;
166 free( ptask ); 162 free(ptask);
167 break; 163 break;
168 } 164 }
169 165
170 /* Release lock */ 166 /* Release lock */
171 pthread_mutex_unlock( &tasklist_mutex ); 167 pthread_mutex_unlock(&tasklist_mutex);
172} 168}
173 169
174int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovec ) { 170int mutex_workqueue_pushresult(ot_taskid taskid, int iovec_entries, struct iovec *iovec) {
175 struct ot_task * task; 171 struct ot_task *task;
176 const char byte = 'o'; 172 const char byte = 'o';
177 173
178 /* Want exclusive access to tasklist */ 174 /* Want exclusive access to tasklist */
179 pthread_mutex_lock( &tasklist_mutex ); 175 pthread_mutex_lock(&tasklist_mutex);
180 176
181 for (task = tasklist; task; task = task->next) 177 for (task = tasklist; task; task = task->next)
182 if (task->taskid == taskid) { 178 if (task->taskid == taskid) {
@@ -187,25 +183,25 @@ int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iove
187 } 183 }
188 184
189 /* Release lock */ 185 /* Release lock */
190 pthread_mutex_unlock( &tasklist_mutex ); 186 pthread_mutex_unlock(&tasklist_mutex);
191 187
192 io_trywrite( g_self_pipe[1], &byte, 1 ); 188 io_trywrite(g_self_pipe[1], &byte, 1);
193 189
194 /* Indicate whether the worker has to throw away results */ 190 /* Indicate whether the worker has to throw away results */
195 return task ? 0 : -1; 191 return task ? 0 : -1;
196} 192}
197 193
198int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) { 194int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) {
199 struct ot_task * task; 195 struct ot_task *task;
200 const char byte = 'o'; 196 const char byte = 'o';
201 197
202 /* Want exclusive access to tasklist */ 198 /* Want exclusive access to tasklist */
203 pthread_mutex_lock( &tasklist_mutex ); 199 pthread_mutex_lock(&tasklist_mutex);
204 200
205 for (task = tasklist; task; task = task->next) 201 for (task = tasklist; task; task = task->next)
206 if (task->taskid == taskid) { 202 if (task->taskid == taskid) {
207 if( iovec ) { 203 if (iovec) {
208 if (iovec_append(&task->iovec_entries, &task->iovec, iovec) ) 204 if (iovec_append(&task->iovec_entries, &task->iovec, iovec))
209 task->tasktype = TASK_DONE_PARTIAL; 205 task->tasktype = TASK_DONE_PARTIAL;
210 else 206 else
211 task = NULL; 207 task = NULL;
@@ -215,65 +211,64 @@ int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) {
215 } 211 }
216 212
217 /* Release lock */ 213 /* Release lock */
218 pthread_mutex_unlock( &tasklist_mutex ); 214 pthread_mutex_unlock(&tasklist_mutex);
219 215
220 io_trywrite( g_self_pipe[1], &byte, 1 ); 216 io_trywrite(g_self_pipe[1], &byte, 1);
221 217
222 /* Indicate whether the worker has to throw away results */ 218 /* Indicate whether the worker has to throw away results */
223 return task ? 0 : -1; 219 return task ? 0 : -1;
224} 220}
225 221
226 222int64 mutex_workqueue_popresult(int *iovec_entries, struct iovec **iovec, int *is_partial) {
227int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec, int *is_partial ) { 223 struct ot_task **task;
228 struct ot_task ** task; 224 int64 sock = -1;
229 int64 sock = -1;
230 225
231 *is_partial = 0; 226 *is_partial = 0;
232 227
233 /* Want exclusive access to tasklist */ 228 /* Want exclusive access to tasklist */
234 pthread_mutex_lock( &tasklist_mutex ); 229 pthread_mutex_lock(&tasklist_mutex);
235 230
236 for (task = &tasklist; *task; task = &((*task)->next)) 231 for (task = &tasklist; *task; task = &((*task)->next))
237 if (((*task)->tasktype & TASK_CLASS_MASK ) == TASK_DONE) { 232 if (((*task)->tasktype & TASK_CLASS_MASK) == TASK_DONE) {
238 struct ot_task *ptask = *task; 233 struct ot_task *ptask = *task;
239 *iovec_entries = ptask->iovec_entries; 234 *iovec_entries = ptask->iovec_entries;
240 *iovec = ptask->iovec; 235 *iovec = ptask->iovec;
241 sock = ptask->sock; 236 sock = ptask->sock;
242 237
243 if ((*task)->tasktype == TASK_DONE) { 238 if ((*task)->tasktype == TASK_DONE) {
244 *task = ptask->next; 239 *task = ptask->next;
245 free( ptask ); 240 free(ptask);
246 } else { 241 } else {
247 ptask->iovec_entries = 0; 242 ptask->iovec_entries = 0;
248 ptask->iovec = NULL; 243 ptask->iovec = NULL;
249 *is_partial = 1; 244 *is_partial = 1;
250 /* Prevent task from showing up immediately again unless new data was added */ 245 /* Prevent task from showing up immediately again unless new data was added */
251 (*task)->tasktype = TASK_FULLSCRAPE; 246 (*task)->tasktype = TASK_FULLSCRAPE;
252 } 247 }
253 break; 248 break;
254 } 249 }
255 250
256 /* Release lock */ 251 /* Release lock */
257 pthread_mutex_unlock( &tasklist_mutex ); 252 pthread_mutex_unlock(&tasklist_mutex);
258 return sock; 253 return sock;
259} 254}
260 255
261void mutex_init( ) { 256void mutex_init() {
262 int i; 257 int i;
263 pthread_mutex_init(&tasklist_mutex, NULL); 258 pthread_mutex_init(&tasklist_mutex, NULL);
264 pthread_cond_init (&tasklist_being_filled, NULL); 259 pthread_cond_init(&tasklist_being_filled, NULL);
265 for (i=0; i < OT_BUCKET_COUNT; ++i) 260 for (i = 0; i < OT_BUCKET_COUNT; ++i)
266 pthread_mutex_init(bucket_mutex + i, NULL); 261 pthread_mutex_init(bucket_mutex + i, NULL);
267 byte_zero( all_torrents, sizeof( all_torrents ) ); 262 byte_zero(all_torrents, sizeof(all_torrents));
268} 263}
269 264
270void mutex_deinit( ) { 265void mutex_deinit() {
271 int i; 266 int i;
272 for (i=0; i < OT_BUCKET_COUNT; ++i) 267 for (i = 0; i < OT_BUCKET_COUNT; ++i)
273 pthread_mutex_destroy(bucket_mutex + i); 268 pthread_mutex_destroy(bucket_mutex + i);
274 pthread_mutex_destroy(&tasklist_mutex); 269 pthread_mutex_destroy(&tasklist_mutex);
275 pthread_cond_destroy(&tasklist_being_filled); 270 pthread_cond_destroy(&tasklist_being_filled);
276 byte_zero( all_torrents, sizeof( all_torrents ) ); 271 byte_zero(all_torrents, sizeof(all_torrents));
277} 272}
278 273
279const char *g_version_mutex_c = "$Source$: $Revision$\n"; 274const char *g_version_mutex_c = "$Source$: $Revision$\n";