source: trunk/sys/libphoenix/tpool.c @ 131

Last change on this file since 131 was 1, checked in by alain, 8 years ago

First import

File size: 8.0 KB
Line 
1/* Copyright (c) 2007-2009, Stanford University
2* All rights reserved.
3*
4* Redistribution and use in source and binary forms, with or without
5* modification, are permitted provided that the following conditions are met:
6*     * Redistributions of source code must retain the above copyright
7*       notice, this list of conditions and the following disclaimer.
8*     * Redistributions in binary form must reproduce the above copyright
9*       notice, this list of conditions and the following disclaimer in the
10*       documentation and/or other materials provided with the distribution.
11*     * Neither the name of Stanford University nor the names of its
12*       contributors may be used to endorse or promote products derived from
13*       this software without specific prior written permission.
14*
15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25*/ 
26
27#include <pthread.h>
28#include <assert.h>
29#include <semaphore.h>
30
31#include "atomic.h"
32#include "memory.h"
33#include "tpool.h"
34#include "stddefines.h"
35
36typedef struct {
37    sem_t           sem_run;
38    unsigned int    *num_workers_done;
39    sem_t           *sem_all_workers_done;
40    thread_func     *thread_func;
41    void            **thread_func_arg;
42    void            **ret;
43    int             *num_workers;
44    int             *die;
45} thread_arg_t;
46
47struct tpool_t {
48    int             num_threads;
49    int             num_workers;
50    int             die;
51    thread_func     thread_func;
52    sem_t           sem_all_workers_done;
53    unsigned int    num_workers_done;
54    void            **args;
55    pthread_t       *threads;
56    thread_arg_t    *thread_args;
57};
58
59static void* thread_loop (void *);
60
61tpool_t* tpool_create (int num_threads)
62{
63    int             i, ret;
64    tpool_t         *tpool;
65    pthread_attr_t  attr;
66    int             this_cpu_id;
67
68    tpool = mem_calloc (1, sizeof (tpool_t));
69    if (tpool == NULL) 
70        return NULL;
71
72    tpool->num_threads = num_threads;
73    tpool->num_workers = num_threads;
74
75    tpool->args = (void **)mem_malloc (sizeof (void *) * num_threads);
76    if (tpool->args == NULL) 
77        goto fail_args;
78
79    tpool->threads = (pthread_t *)mem_malloc (sizeof (pthread_t) * num_threads);
80    if (tpool->threads == NULL) 
81        goto fail_threads;
82
83    tpool->thread_args = (thread_arg_t *)mem_malloc (
84        sizeof (thread_arg_t) * num_threads);
85    if (tpool->thread_args == NULL) 
86        goto fail_thread_args;
87
88    ret = sem_init (&tpool->sem_all_workers_done, 0, 0);
89    if (ret != 0) 
90        goto fail_all_workers_done;
91
92    CHECK_ERROR (pthread_attr_init (&attr));
93    CHECK_ERROR (pthread_attr_setscope (&attr, PTHREAD_SCOPE_SYSTEM));
94    CHECK_ERROR (pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED));   
95   
96#if USE_EXPLICIT_PLACEMENT
97    CHECK_ERROR (pthread_attr_getcpuid_np(&this_cpu_id));
98#endif
99
100    tpool->die = 0;
101
102    for (i = 0; i < num_threads; ++i) {
103        /* Initialize thread argument. */
104        CHECK_ERROR (sem_init (&(tpool->thread_args[i].sem_run), 0, 0));
105        tpool->thread_args[i].sem_all_workers_done = 
106            &tpool->sem_all_workers_done;
107        tpool->thread_args[i].num_workers_done = 
108            &tpool->num_workers_done;
109        tpool->thread_args[i].die = &tpool->die;
110        tpool->thread_args[i].thread_func = &tpool->thread_func;
111        tpool->thread_args[i].thread_func_arg = &tpool->args[i];
112        tpool->thread_args[i].ret = (void **)mem_malloc (sizeof (void *));
113        CHECK_ERROR (tpool->thread_args[i].ret == NULL);
114        tpool->thread_args[i].num_workers = &tpool->num_workers;
115
116#if USE_EXCPLICIT_PLACEMENT
117        if ((i+1) == this_cpu_id)
118          pthread_attr_setcpuid_np(&attr, 0, NULL);
119        else
120          pthread_attr_setcpuid_np(&attr, i+1, NULL);
121#endif
122
123        ret = pthread_create (&tpool->threads[i], &attr, thread_loop, &tpool->thread_args[i]);
124        if (ret) 
125            goto fail_thread_create;
126    }
127
128    return tpool;
129
130fail_thread_create:
131    --i;
132    while (i >= 0)
133    {
134    //    pthread_cancel (tpool->threads[i]);
135        --i;
136    }
137fail_all_workers_done:
138    mem_free (tpool->thread_args);
139fail_thread_args:
140    mem_free (tpool->threads);
141fail_threads:
142    mem_free (tpool->args);
143fail_args:
144
145    return NULL;
146}
147
148int tpool_set (
149    tpool_t *tpool, thread_func thread_func, void **args, int num_workers)
150{
151    int             i;
152   
153    assert (tpool != NULL);
154
155    tpool->thread_func = thread_func;
156
157    assert (num_workers <= tpool->num_threads);
158    tpool->num_workers = num_workers;
159
160    for (i = 0; i < num_workers; ++i)
161    {
162        tpool->args[i] = args[i];
163    }
164   
165
166    return 0;
167}
168
169int tpool_begin (tpool_t *tpool)
170{
171    int             i, ret;
172
173    assert (tpool != NULL);
174
175    if (tpool->num_workers == 0)
176        return 0;
177
178    tpool->num_workers_done = 0;
179
180    for (i = 0; i < tpool->num_workers; ++i) {
181        printf("reveille de %d\n",i);
182        ret = sem_post (&(tpool->thread_args[i].sem_run));
183        if (ret != 0) 
184            return -1;
185    }
186
187    return 0;
188}
189
190int tpool_wait (tpool_t *tpool)
191{
192    int             ret;
193
194    assert (tpool != NULL);
195
196    if (tpool->num_workers == 0)
197        return 0;
198
199    ret = sem_wait (&tpool->sem_all_workers_done);
200    if (ret != 0) 
201        return -1;
202
203    return 0;
204}
205
206void** tpool_get_results (tpool_t *tpool)
207{
208    int             i;
209    void            **rets;
210
211    assert (tpool != NULL);
212
213    rets = (void **)mem_malloc (sizeof (void *) * tpool->num_threads);
214    CHECK_ERROR (rets == NULL);
215
216    for (i = 0; i < tpool->num_threads; ++i) {
217        rets[i] = *(tpool->thread_args[i].ret);   
218    }
219
220    return rets;
221}
222
223int tpool_destroy (tpool_t *tpool)
224{
225    int             i;
226    int             result;
227   
228    assert (tpool != NULL);
229    assert (tpool->die == 0);
230
231    result = 0;
232    tpool->num_workers = tpool->num_threads;
233    tpool->num_workers_done = 0;
234   
235    for (i = 0; i < tpool->num_threads; ++i) {
236        mem_free (tpool->thread_args[i].ret);
237
238        tpool->die = 1;
239        sem_post(&tpool->thread_args[i].sem_run);
240    }
241
242    sem_wait(&tpool->sem_all_workers_done);
243
244    sem_destroy(&tpool->sem_all_workers_done);
245    mem_free (tpool->args);
246    mem_free (tpool->threads);
247    mem_free (tpool->thread_args);
248
249    mem_free (tpool);
250
251    return result;
252}
253
254static void* thread_loop (void *arg)
255{
256    thread_arg_t    *thread_arg = arg;
257    thread_func     thread_func;
258    void            *thread_func_arg;
259    void            **ret;
260    int             num_workers_done;
261   
262    assert (thread_arg);
263
264    while (1)
265    {
266        CHECK_ERROR (sem_wait (&thread_arg->sem_run));
267        if (*thread_arg->die) {
268            break;
269        }
270        thread_func = *(thread_arg->thread_func);
271        thread_func_arg = *(thread_arg->thread_func_arg);
272        ret = thread_arg->ret;
273
274        /* Run thread function. */
275        *ret = (*thread_func)(thread_func_arg);
276        num_workers_done = fetch_and_inc(thread_arg->num_workers_done) + 1;
277        if (num_workers_done == *thread_arg->num_workers)
278        {
279            /* Everybody's done. */
280            CHECK_ERROR (sem_post (thread_arg->sem_all_workers_done));
281        }
282    }
283
284    sem_destroy (&thread_arg->sem_run);
285    num_workers_done = fetch_and_inc(thread_arg->num_workers_done) + 1;
286    if (num_workers_done == *thread_arg->num_workers)
287    {
288        /* Everybody's done. */
289        CHECK_ERROR (sem_post (thread_arg->sem_all_workers_done));
290    }
291   
292    return NULL;
293}
Note: See TracBrowser for help on using the repository browser.