source: trunk/sys/libphoenix/taskQ.c @ 65

Last change on this file since 65 was 1, checked in by alain, 9 years ago

First import

File size: 12.1 KB
RevLine 
[1]1/* Copyright (c) 2007-2009, Stanford University
2* All rights reserved.
3*
4* Redistribution and use in source and binary forms, with or without
5* modification, are permitted provided that the following conditions are met:
6*     * Redistributions of source code must retain the above copyright
7*       notice, this list of conditions and the following disclaimer.
8*     * Redistributions in binary form must reproduce the above copyright
9*       notice, this list of conditions and the following disclaimer in the
10*       documentation and/or other materials provided with the distribution.
11*     * Neither the name of Stanford University nor the names of its
12*       contributors may be used to endorse or promote products derived from
13*       this software without specific prior written permission.
14*
15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25*/ 
26
27#include <assert.h>
28#include <stdlib.h>
29#include <stdio.h>
30#include <inttypes.h>
31#include <string.h>
32
33#include "memory.h"
34#include "taskQ.h"
35#include "queue.h"
36#include "synch.h"
37#include "locality.h"
38
39static int num_strands_per_chip = 0;
40
41typedef struct {
42    task_t              task; 
43    queue_elem_t        queue_elem;
44} tq_entry_t;
45
46typedef struct {
47    mr_lock_t  parent;
48    uintptr_t  chksum;
49    mr_lock_t  *per_thread;
50} tq_lock_t;
51
52struct taskQ_t {
53    int             num_queues;
54    int             num_threads;
55    queue_t         **queues;
56    queue_t         **free_queues;
57    tq_lock_t       *locks;
58    /* putting all seeds together may lead to extra coherence traffic among cpus
59     * if it's a problem we can pad it by l1 line size */
60    /* per-thread random seed */
61    unsigned int    *seeds;
62 };
63
64typedef int (*dequeue_fn)(taskQ_t *, int, int, queue_elem_t**);
65
66static inline taskQ_t* tq_init_normal(int numThreads);
67static inline void tq_finalize_normal(taskQ_t* tq);
68static inline int tq_dequeue_normal(
69    taskQ_t* tq, task_t* task, int lgrp, int tid);
70static inline int tq_dequeue_normal_seq (
71    taskQ_t* tq, task_t* task, int lgrp, int tid);
72static inline int tq_dequeue_normal_internal (
73    taskQ_t* tq, task_t* task, int lgrp, int tid, dequeue_fn dequeue_fn);
74
75static queue_t* tq_alloc_queue(void);
76static void tq_free_queue(queue_t* q);
77static int tq_queue_init(taskQ_t* tq, unsigned int idx);
78static void tq_queue_destroy(taskQ_t* tq, unsigned int idx);
79static void tq_empty_queue(queue_t* q);
80
81taskQ_t* tq_init (int num_threads)
82{
83    return tq_init_normal(num_threads);
84}
85
86void tq_reset (taskQ_t* tq, int num_threads)
87{
88}
89
90/**
91 * Initialize task queue for a normal machine
92 */
93static inline taskQ_t* tq_init_normal(int numThreads)
94{
95    int             i;
96    taskQ_t         *tq = NULL;
97
98    tq = mem_calloc(1, sizeof(taskQ_t));
99    if (tq == NULL) {
100        return NULL;
101    }
102
103    /* XXX should this be local? */
104    num_strands_per_chip = loc_get_lgrp_size ();
105    tq->num_threads = numThreads;
106    tq->num_queues = tq->num_threads / num_strands_per_chip;
107
108    if (tq->num_queues == 0)
109        tq->num_queues = 1;
110
111    tq->queues = (queue_t **)mem_calloc (tq->num_queues, sizeof (queue_t *));
112    if (tq->queues == NULL) goto fail_queues;
113
114    tq->free_queues = (queue_t **)mem_calloc (
115        tq->num_queues, sizeof (queue_t *));
116    if (tq->free_queues == NULL) goto fail_free_queues;
117
118    tq->locks = (tq_lock_t *)mem_calloc (tq->num_queues, sizeof (tq_lock_t));
119    if (tq->locks == NULL) goto fail_locks;
120
121    tq->seeds = (unsigned int*)mem_calloc(
122        tq->num_threads, sizeof(unsigned int));
123    if (tq->seeds == NULL) goto fail_seeds;
124    mem_memset(tq->seeds, 0, sizeof(unsigned int) * tq->num_threads);
125
126    for (i = 0; i < tq->num_queues; ++i)
127        if (!tq_queue_init(tq, i))
128            goto fail_tq_init;
129
130    return tq;
131
132fail_tq_init:
133    /* destroy all queues that have been allocated */
134    i--;
135    while (i >= 0) {
136        tq_queue_destroy(tq, i);
137        --i;
138    }
139    mem_free(tq->seeds);
140fail_seeds:
141    mem_free(tq->locks);
142fail_locks:
143    mem_free(tq->free_queues);
144fail_free_queues:
145    mem_free(tq->queues);
146fail_queues:
147    mem_free(tq);
148    return NULL;
149}
150
151/**
152 * Destroys an initialized queue (i.e. free queue and alloc queue) in task queue
153 * @param tq    tq to index
154 * @param idx   index of queue to destroy in tq
155 */
156static void tq_queue_destroy(taskQ_t* tq, unsigned int idx)
157{
158    int             j;
159    uintptr_t       chksum;
160
161//    assert (idx < tq->num_queues);
162
163    tq_empty_queue(tq->queues[idx]);
164    tq_free_queue(tq->queues[idx]);
165
166    tq_empty_queue(tq->free_queues[idx]);
167    tq_free_queue(tq->free_queues[idx]);
168
169    /* free all lock data associated with queue */
170    chksum = 0;
171    for (j = 0; j < tq->num_threads; j++) {
172        chksum += (uintptr_t)tq->locks[idx].per_thread[j];
173        lock_free_per_thread(tq->locks[idx].per_thread[j]);
174    }
175
176    lock_free (tq->locks[idx].parent);
177
178    mem_free (tq->locks[idx].per_thread);
179    tq->locks[idx].per_thread = NULL;
180}
181
182/**
183 * Initialize a queue for a given index in the task queue
184 * @return zero on failure, nonzero on success
185 */
186static int tq_queue_init(taskQ_t* tq, unsigned int idx)
187{
188    int     j;
189
190    assert (idx < (unsigned)tq->num_queues);
191
192    tq->queues[idx] = tq_alloc_queue();
193    if (tq->queues[idx] == NULL) return 0;
194
195    tq->free_queues[idx] = tq_alloc_queue();
196    if (tq->free_queues[idx] == NULL) goto fail_free_queue;
197
198    tq->locks[idx].parent = lock_alloc();
199
200    tq->locks[idx].per_thread = (mr_lock_t *)mem_calloc(
201        tq->num_threads, sizeof(mr_lock_t));
202    if (tq->locks[idx].per_thread == NULL) goto fail_priv_alloc;
203
204    tq->locks[idx].chksum = 0;
205    for (j = 0; j < tq->num_threads; ++j) {
206        mr_lock_t   per_thread;
207        per_thread = lock_alloc_per_thread(tq->locks[idx].parent);
208        tq->locks[idx].per_thread[j] = per_thread;
209        tq->locks[idx].chksum += (uintptr_t)per_thread;
210    }
211
212    return 1;
213
214fail_priv_alloc:
215    lock_free(&tq->locks[idx].parent);
216    tq_free_queue(tq->free_queues[idx]);
217    tq->free_queues[idx] = NULL;
218fail_free_queue:
219    tq_free_queue(tq->queues[idx]);
220    tq->queues[idx] = NULL;
221
222    return 0;
223}
224
225/**
226 * Allocates an initialized queue
227 * @return NULL on failure, initialized queue pointer on success
228 */
229static queue_t* tq_alloc_queue(void)
230{
231    queue_t *q;
232
233    q = (queue_t*) mem_malloc (sizeof(queue_t));
234    if (q == NULL) {
235        return NULL;
236    }
237
238    queue_init(q);
239
240    return q;
241}
242
243/**
244 * Frees an initialized queue that was allocated on the heap.
245 */
246static void tq_free_queue(queue_t* q)
247{
248    mem_free(q);
249}
250
251/**
252 * Empties out a queue in the task queue by dequeuing and freeing
253 * every task.
254 */
255static void tq_empty_queue(queue_t* q)
256{
257    do {
258        tq_entry_t      *entry;
259        queue_elem_t    *queue_elem;
260
261        if (queue_pop_front (q, &queue_elem) == 0)
262            break;
263
264        entry = queue_entry (queue_elem, tq_entry_t, queue_elem);
265        assert (entry != NULL);
266        mem_free (entry);
267    } while (1);
268}
269
270static inline void tq_finalize_normal(taskQ_t* tq)
271{
272    int i;
273
274    assert (tq->queues != NULL);
275    assert (tq->free_queues != NULL);
276    assert (tq->locks != NULL);
277
278    /* destroy all queues */
279    for (i = 0; i < tq->num_queues; ++i) {
280        tq_queue_destroy(tq, i);
281    }
282
283    /* destroy all first level pointers in tq */
284    mem_free (tq->queues);
285    mem_free (tq->free_queues);
286    mem_free (tq->locks);
287    mem_free (tq->seeds);
288
289    /* finally kill tq */
290    mem_free (tq);
291}
292
293void tq_finalize (taskQ_t* tq)
294{
295    tq_finalize_normal(tq);
296}
297
298/* Queue TASK at LGRP task queue with locking.
299   LGRP is a locality hint denoting to which locality group this task
300   should be queued at. If LGRP is less than 0, the locality group is
301   randomly selected. TID is required for MCS locking. */
302int tq_enqueue (taskQ_t* tq, task_t *task, int lgrp, int tid)
303{
304    tq_entry_t      *entry;
305    int             index;
306
307    assert (tq != NULL);
308    assert (task != NULL);
309
310    entry = (tq_entry_t *)mem_malloc (sizeof (tq_entry_t));
311    if (entry == NULL) {
312        return -1;
313    }
314
315    mem_memcpy (&entry->task, task, sizeof (task_t));
316
317    index = (lgrp < 0) ? rand_r(&tq->seeds[tid]) : lgrp;
318    index %= tq->num_queues;
319
320    lock_acquire (tq->locks[index].per_thread[tid]);
321    queue_push_back (tq->queues[index], &entry->queue_elem);
322    lock_release (tq->locks[index].per_thread[tid]);
323
324    return 0;
325}
326
327/* Queue TASK at LGRP task queue without locking.
328   LGRP is a locality hint denoting to which locality group this task
329   should be queued at. If LGRP is less than 0, the locality group is
330   randomly selected. */
331int tq_enqueue_seq (taskQ_t* tq, task_t *task, int lgrp)
332{
333    tq_entry_t      *entry;
334    int             index;
335
336    assert (task != NULL);
337
338    entry = (tq_entry_t *)mem_malloc (sizeof (tq_entry_t));
339    if (entry == NULL) {
340        return -1;
341    }
342
343    mem_memcpy (&entry->task, task, sizeof (task_t));
344
345    index = (lgrp < 0) ? rand() % tq->num_queues : lgrp % tq->num_queues;
346    queue_push_back (tq->queues[index], &entry->queue_elem);
347
348    return 0;
349}
350
351/**
352 * Safely dequeues an element from the normal queue and queues onto free queue
353 * @param tq        taskQ to operate on
354 * @param idx       index of queue to use
355 * @param tid       task id
356 * @param qe        queue element of element we operated on
357 * @return nonzero if normal queue was empty
358 */
359static inline int tq_elem_into_free_seq (
360    taskQ_t* tq, int idx, int tid, queue_elem_t** qe)
361{
362    queue_elem_t    *queue_elem = NULL;
363    int             ret;
364
365    ret = queue_pop_front (tq->queues[idx], &queue_elem);
366    if (ret != 0)
367        queue_push_back (tq->free_queues[idx], queue_elem);
368
369    *qe = queue_elem;
370
371    return ret;
372}
373
374/**
375 * Safely dequeues an element from the normal queue and queues onto free queue
376 * @param tq        taskQ to operate on
377 * @param idx       index of queue to use
378 * @param tid       task id
379 * @param qe        queue element of element we operated on
380 * @return nonzero if normal queue was empty
381 */
382static inline int tq_elem_into_free (
383    taskQ_t* tq, int idx, int tid, queue_elem_t** qe)
384{
385    int             ret;
386
387    lock_acquire (tq->locks[idx].per_thread[tid]);
388    ret = tq_elem_into_free_seq (tq, idx, tid, qe);
389    lock_release (tq->locks[idx].per_thread[tid]);
390
391    return ret;
392}
393
394static inline int tq_dequeue_normal_seq (
395    taskQ_t* tq, task_t* task, int lgrp, int tid)
396{
397    return tq_dequeue_normal_internal (
398        tq, task, lgrp, tid, tq_elem_into_free_seq);
399}
400
401static inline int tq_dequeue_normal(
402    taskQ_t* tq, task_t* task, int lgrp, int tid)
403{
404    return tq_dequeue_normal_internal (
405        tq, task, lgrp, tid, tq_elem_into_free);
406}
407
408static inline int tq_dequeue_normal_internal (
409    taskQ_t* tq, task_t* task, int lgrp, int tid, dequeue_fn dequeue_fn)
410{
411    int             i, ret, index;
412    queue_elem_t    *queue_elem;
413    tq_entry_t      *entry;
414
415    assert (task != NULL);
416
417    mem_memset (task, 0, sizeof (task_t));
418
419    index = (lgrp < 0) ? rand_r(&tq->seeds[tid]) : lgrp;
420    index %= tq->num_queues;
421    ret = (*dequeue_fn)(tq, index, tid, &queue_elem);
422
423   /* Do task stealing if nothing on our queue.
424      Cycle through all indexes until success or exhaustion */
425    for (i = (index + 1) % tq->num_queues;
426        (ret == 0) && (i != index);
427        i = (i + 1) % tq->num_queues)
428    {
429        ret = (*dequeue_fn)(tq, i, tid, &queue_elem);
430    }
431
432    if (ret == 0) {
433        /* There really is no more work. */
434        return 0;
435    }
436
437    entry = queue_entry (queue_elem, tq_entry_t, queue_elem);
438    assert (entry != NULL);
439
440    mem_memcpy (task, &entry->task, sizeof (task_t));
441
442    return 1;
443}
444
445int tq_dequeue (taskQ_t* tq, task_t *task, int lgrp, int tid)
446{
447    return tq_dequeue_normal(tq, task, lgrp, tid);
448}
Note: See TracBrowser for help on using the repository browser.