source: trunk/sys/libphoenix/map_reduce.c @ 240

Last change on this file since 240 was 1, checked in by alain, 8 years ago

First import

File size: 52.2 KB
RevLine 
[1]1/* Copyright (c) 2007-2009, Stanford University
2* All rights reserved.
3*
4* Redistribution and use in source and binary forms, with or without
5* modification, are permitted provided that the following conditions are met:
6*     * Redistributions of source code must retain the above copyright
7*       notice, this list of conditions and the following disclaimer.
8*     * Redistributions in binary form must reproduce the above copyright
9*       notice, this list of conditions and the following disclaimer in the
10*       documentation and/or other materials provided with the distribution.
11*     * Neither the name of Stanford University nor the names of its
12*       contributors may be used to endorse or promote products derived from
13*       this software without specific prior written permission.
14*
15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25*/ 
26
27#include <assert.h>
28#include <pthread.h>
29#include <unistd.h>
30#include <sys/types.h>
31#include <sys/syscall.h>
32#include <unistd.h>
33#include <stdlib.h>
34#include <stdio.h>
35#include <errno.h>
36#include <string.h>
37#include <strings.h>
38#include <sys/stat.h>
39#include <fcntl.h>
40#include <inttypes.h>
41#include <sys/mman.h>
42#include <time.h>
43
44#include "map_reduce.h"
45#include "memory.h"
46#include "processor.h"
47#include "defines.h"
48#include "scheduler.h"
49#include "synch.h"
50#include "taskQ.h"
51#include "queue.h"
52#include "stddefines.h"
53#include "iterator.h"
54#include "locality.h"
55#include "struct.h"
56#include "tpool.h"
57
58#undef TIMING
59
60/* Begin tunables. */
61//#define INCREMENTAL_COMBINER
62
63#define DEFAULT_NUM_REDUCE_TASKS    256
64#define EXTENDED_NUM_REDUCE_TASKS   (DEFAULT_NUM_REDUCE_TASKS * 128)
65#define DEFAULT_CACHE_SIZE          (16 * 1024)
66//#define DEFAULT_CACHE_SIZE        (8 * 1024)
67#define DEFAULT_KEYVAL_ARR_LEN      10
68#define DEFAULT_VALS_ARR_LEN        10
69#define L2_CACHE_LINE_SIZE          64
70/* End tunables. */
71
72/* Debug printf */
73#ifdef dprintf
74#undef dprintf
75#define dprintf(...) printf(__VA_ARGS__)
76#endif
77
78#define MIN(X,Y) ((X) < (Y) ? (X) : (Y))
79#define MAX(X,Y) ((X) > (Y) ? (X) : (Y))
80#define OUT_PREFIX "[Phoenix] "
81
82/* A key and a value pair. */
83typedef struct 
84{
85    /* TODO add static assertion to make sure this fits in L2 line */
86    union {
87        struct {
88            int         len;
89            int         alloc_len;
90            int         pos;
91            keyval_t    *arr;
92        };
93        char pad[L2_CACHE_LINE_SIZE];
94    };
95} keyval_arr_t;
96
97/* Array of keyvals_t. */
98typedef struct 
99{
100    int len;
101    int alloc_len;
102    int pos;
103    keyvals_t *arr;
104} keyvals_arr_t;
105
106/* Thread information.
107   Denotes the id and the assigned CPU of a thread. */
108typedef struct 
109{
110    union {
111        struct {
112            pthread_t tid;
113            int curr_task;
114        };
115        char pad[L2_CACHE_LINE_SIZE];
116    };
117} thread_info_t;
118
119typedef struct
120{
121    uintptr_t work_time;
122    uintptr_t user_time;
123    uintptr_t combiner_time;
124} thread_timing_t;
125
126typedef struct {
127    task_t          task;
128    queue_elem_t    queue_elem;
129} task_queued;
130
131/* Internal map reduce state. */
132typedef struct
133{
134    /* Parameters. */
135    int num_map_tasks;              /* # of map tasks. */
136    int num_reduce_tasks;           /* # of reduce tasks. */
137    int chunk_size;                 /* # of units of data for each map task. */
138    int num_procs;                  /* # of processors to run on. */
139    int num_map_threads;            /* # of threads for map tasks. */
140    int num_reduce_threads;         /* # of threads for reduce tasks. */
141    int num_merge_threads;          /* # of threads for merge tasks. */
142    float key_match_factor;         /* # of values likely to be matched
143                                       to the same key. */
144
145    bool oneOutputQueuePerMapTask;      /* One output queue per map task? */
146    bool oneOutputQueuePerReduceTask;   /* One output queue per reduce task? */
147
148    int intermediate_task_alloc_len;
149
150    /* Callbacks. */
151    map_t map;                      /* Map function. */
152    reduce_t reduce;                /* Reduce function. */
153    combiner_t combiner;            /* Combiner function. */
154    partition_t partition;          /* Partition function. */     
155    splitter_t splitter;            /* Splitter function. */
156    locator_t locator;              /* Locator function. */
157    key_cmp_t key_cmp;              /* Key comparator function. */
158
159    /* Structures. */
160    map_reduce_args_t * args;       /* Args passed in by the user. */
161    thread_info_t * tinfo;          /* Thread information array. */
162
163    keyvals_arr_t **intermediate_vals;
164                                    /* Array to send to reduce task. */
165
166    keyval_arr_t *final_vals;       /* Array to send to merge task. */
167    keyval_arr_t *merge_vals;       /* Array to send to user. */
168
169    int splitter_pos;         /* Tracks position in array_splitter(). */
170
171    /* Policy for mapping threads to cpus. */
172    sched_policy    *schedPolicies[TASK_TYPE_TOTAL];
173
174
175    taskQ_t         *taskQueue;     /* Queues of tasks. */
176    tpool_t         *tpool;         /* Thread pool. */
177} mr_env_t;
178
179#ifdef TIMING
180static pthread_key_t emit_time_key;
181#endif
182static pthread_key_t env_key;       /* Environment for current thread. */
183static pthread_key_t tpool_key;
184static pthread_key_t thread_index_key;
185
186/* Data passed on to each worker thread. */
187typedef struct
188{
189    int             cpu_id;             /* CPU this thread is to run. */
190    int             thread_id;          /* Thread index. */
191    TASK_TYPE_T     task_type;          /* Assigned task type. */
192    int             merge_len;
193    keyval_arr_t    *merge_input;
194    int             merge_round;
195    mr_env_t        *env;
196} thread_arg_t;
197
198static inline mr_env_t* env_init (map_reduce_args_t *);
199static void env_fini(mr_env_t* env);
200static inline void env_print (mr_env_t* env);
201static inline void start_workers (mr_env_t* env, thread_arg_t *);
202static inline void *start_my_work (thread_arg_t *);
203static inline void emit_inline (mr_env_t* env, void *, void *);
204static inline mr_env_t* get_env(void);
205static inline int getCurrThreadIndex (TASK_TYPE_T);
206static inline int getNumTaskThreads (mr_env_t* env, TASK_TYPE_T);
207static inline void insert_keyval (
208    mr_env_t* env, keyval_arr_t *, void *, void *);
209static inline void insert_keyval_merged (
210    mr_env_t* env, keyvals_arr_t *, void *, void *);
211
212static int array_splitter (void *, int, map_args_t *);
213static void identity_reduce (void *, iterator_t *itr);
214static inline void merge_results (mr_env_t* env, keyval_arr_t*, int);
215
216static void *map_worker (void *);
217static void *reduce_worker (void *);
218static void *merge_worker (void *);
219
220static int gen_map_tasks (mr_env_t* env);
221static int gen_map_tasks_split(mr_env_t* env, queue_t* q);
222static int gen_reduce_tasks (mr_env_t* env);
223
224static void map(mr_env_t* mr);
225static void reduce(mr_env_t* mr);
226static void merge(mr_env_t* mr);
227
228#ifndef INCREMENTAL_COMBINER
229static void run_combiner (mr_env_t* env, int thread_idx);
230#endif
231
232int 
233map_reduce_init ()
234{
235    CHECK_ERROR (pthread_key_create (&tpool_key, NULL));
236    CHECK_ERROR (pthread_key_create (&thread_index_key, NULL));
237    return 0;
238}
239
240int
241map_reduce (map_reduce_args_t * args)
242{
243    struct timeval begin, end;
244    mr_env_t* env;
245
246    assert (args != NULL);
247    assert (args->map != NULL);
248    assert (args->key_cmp != NULL);
249    assert (args->unit_size > 0);
250    assert (args->result != NULL);
251
252    get_time (&begin);
253
254    /* Initialize environment. */
255    env = env_init (args);
256    if (env == NULL) {
257       /* could not allocate environment */
258       return -1;
259    }
260    //env_print (env);
261    env->taskQueue = tq_init (env->num_map_threads);
262    assert (env->taskQueue != NULL);
263
264    /* Reuse thread pool. */
265    env->tpool = pthread_getspecific (tpool_key);
266    if (env->tpool == NULL) {
267        tpool_t *tpool;
268
269        tpool = tpool_create (env->num_map_threads-1);
270        CHECK_ERROR (tpool == NULL);
271
272        env->tpool = tpool;
273        CHECK_ERROR (pthread_setspecific (tpool_key, tpool));
274    }
275
276#ifdef TIMING
277    CHECK_ERROR (pthread_key_create (&emit_time_key, NULL));
278#endif
279    CHECK_ERROR (pthread_key_create (&env_key, NULL));
280
281    pthread_setspecific (env_key, env);
282
283    get_time (&end);
284
285#if 1//TIMING
286    fprintf (stderr, "library init: %u\n", time_diff (&end, &begin));
287#endif
288
289    /* Run map tasks and get intermediate values. */
290    get_time (&begin);
291    map (env);
292    get_time (&end);
293
294#if 1//TIMING
295    fprintf (stderr, "map phase: %u\n", time_diff (&end, &begin));
296#endif
297
298    dprintf("In scheduler, all map tasks are done, now scheduling reduce tasks\n");
299   
300    /* Run reduce tasks and get final values. */
301    get_time (&begin);
302    reduce (env);
303    get_time (&end);
304
305#if 1//TIMING
306    fprintf (stderr, "reduce phase: %u\n", time_diff (&end, &begin));
307#endif
308
309    dprintf("In scheduler, all reduce tasks are done, now scheduling merge tasks\n");
310
311    get_time (&begin);
312    merge (env);
313    get_time (&end);
314
315#if 1//TIMING
316    fprintf (stderr, "merge phase: %u\n", time_diff (&end, &begin));
317#endif
318
319    /* Cleanup. */
320    get_time (&begin);
321    env_fini(env);
322    CHECK_ERROR (pthread_key_delete (env_key));
323    get_time (&end);
324
325#if 1//TIMING
326    fprintf (stderr, "library finalize: %u\n", time_diff (&end, &begin));
327    //CHECK_ERROR (pthread_key_delete (emit_time_key));
328#endif
329
330    return 0;
331}
332
333int map_reduce_finalize ()
334{
335    tpool_t *tpool;
336
337    tpool = pthread_getspecific (tpool_key);
338    CHECK_ERROR (tpool_destroy (tpool));
339
340    pthread_key_delete (tpool_key);
341
342    return 0;
343}
344
345/**
346 * Frees memory used by map reduce environment once map reduce has completed.
347 * Frees environment pointer.
348 */
349static void env_fini (mr_env_t* env)
350{
351    int i;
352
353    tq_finalize (env->taskQueue);
354
355    for (i = 0; i < TASK_TYPE_TOTAL; i++)
356        sched_policy_put(env->schedPolicies[i]);
357
358    mem_free (env);
359}
360
361/* Setup global state. */
362static mr_env_t* 
363env_init (map_reduce_args_t *args) 
364{
365    mr_env_t    *env;
366    int         i;
367    int         num_procs;
368
369    env = mem_malloc (sizeof (mr_env_t));
370    if (env == NULL) {
371       return NULL;
372    }
373
374    mem_memset (env, 0, sizeof (mr_env_t));
375
376    env->args = args;
377
378    /* 1. Determine paramenters. */
379
380    /* Determine the number of processors to use. */
381    num_procs = proc_get_num_cpus ();
382    if (args->num_procs > 0)
383    {
384        /* Can't have more processors than there are physically present. */
385        CHECK_ERROR (args->num_procs > num_procs);
386        num_procs = args->num_procs;
387    }
388    env->num_procs = num_procs;
389
390    env->oneOutputQueuePerMapTask = false;
391    env->oneOutputQueuePerReduceTask = args->use_one_queue_per_task;
392
393    /* Determine the number of threads to schedule for each type of task. */
394    env->num_map_threads = (args->num_map_threads > 0) ? 
395        args->num_map_threads : num_procs;
396
397    env->num_reduce_threads = (args->num_reduce_threads > 0) ? 
398        args->num_reduce_threads : num_procs;
399
400    env->num_merge_threads = (args->num_merge_threads > 0) ? 
401        args->num_merge_threads : env->num_reduce_threads;
402
403    if (env->oneOutputQueuePerReduceTask == false) {
404        env->num_merge_threads = env->num_reduce_threads / 2;
405    }
406
407    /* Assign at least one merge thread. */
408    env->num_merge_threads = MAX(env->num_merge_threads, 1);
409
410    env->key_match_factor = (args->key_match_factor > 0) ? 
411        args->key_match_factor : 2;
412
413    /* Set num_map_tasks to 0 since we cannot anticipate how many map tasks
414       there would be. This does not matter since map workers will loop
415       until there is no more data left. */
416    env->num_map_tasks = 0;
417
418    if (args->L1_cache_size > 0)
419    {
420        env->chunk_size = args->L1_cache_size / args->unit_size;
421        env->num_reduce_tasks = (int)
422            ( (env->key_match_factor * args->data_size) /
423                args->L1_cache_size);
424    }
425    else
426    {
427        env->chunk_size = DEFAULT_CACHE_SIZE / args->unit_size;
428        env->num_reduce_tasks = (int) 
429            ( (env->key_match_factor * args->data_size) / 
430                DEFAULT_CACHE_SIZE );
431    }
432
433    if (env->num_reduce_tasks <= 0) env->num_reduce_tasks = 1;
434    if (env->chunk_size <= 0) env->chunk_size = 1;
435
436    if (env->oneOutputQueuePerReduceTask == false) 
437    {
438        env->num_reduce_tasks = EXTENDED_NUM_REDUCE_TASKS;
439    } 
440    else 
441    {
442        env->num_reduce_tasks = DEFAULT_NUM_REDUCE_TASKS;
443    }
444    env->num_merge_threads = MIN (
445        env->num_merge_threads, env->num_reduce_tasks / 2);
446
447    if (env->oneOutputQueuePerMapTask) 
448        env->intermediate_task_alloc_len = 
449            args->data_size / env->chunk_size + 1;
450    else
451        env->intermediate_task_alloc_len = env->num_map_threads;
452
453    /* Register callbacks. */
454    env->map = args->map;
455    env->reduce = (args->reduce) ? args->reduce : identity_reduce;
456    env->combiner = args->combiner;
457    env->partition = (args->partition) ? args->partition : default_partition;
458    env->splitter = (args->splitter) ? args->splitter : array_splitter;
459    env->locator = args->locator;
460    env->key_cmp = args->key_cmp;
461
462    /* 2. Initialize structures. */
463
464    env->intermediate_vals = (keyvals_arr_t **)mem_malloc (
465        env->intermediate_task_alloc_len * sizeof (keyvals_arr_t*));
466
467    for (i = 0; i < env->intermediate_task_alloc_len; i++)
468    {
469        env->intermediate_vals[i] = (keyvals_arr_t *)mem_calloc (
470            env->num_reduce_tasks, sizeof (keyvals_arr_t));
471    }
472
473    if (env->oneOutputQueuePerReduceTask)
474    {
475        env->final_vals = 
476            (keyval_arr_t *)mem_calloc (
477                env->num_reduce_tasks, sizeof (keyval_arr_t));
478    }
479    else
480    {
481        env->final_vals =
482            (keyval_arr_t *)mem_calloc (
483                env->num_reduce_threads, sizeof (keyval_arr_t));
484    }
485
486    for (i = 0; i < TASK_TYPE_TOTAL; i++) {
487        /* TODO: Make this tunable */
488        env->schedPolicies[i] = sched_policy_get(SCHED_POLICY_STRAND_FILL);
489    }
490
491    return env;
492}
493
494void env_print (mr_env_t* env)
495{
496    printf (OUT_PREFIX "num_reduce_tasks = %u\n", env->num_reduce_tasks);
497    printf (OUT_PREFIX "num_procs = %u\n", env->num_procs);
498    printf (OUT_PREFIX "proc_offset = %u\n", env->args->proc_offset);
499    printf (OUT_PREFIX "num_map_threads = %u\n", env->num_map_threads);
500    printf (OUT_PREFIX "num_reduce_threads = %u\n", env->num_reduce_threads);
501    printf (OUT_PREFIX "num_merge_threads = %u\n", env->num_merge_threads);
502}
503
504/**
505 * Execute the same function as worker.
506 * @param th_arg    arguments
507 */
508static void *start_my_work (thread_arg_t* th_arg)
509{
510    switch (th_arg->task_type) {
511    case TASK_TYPE_MAP:
512        return map_worker (th_arg);
513        break;
514    case TASK_TYPE_REDUCE:
515        return reduce_worker (th_arg);
516        break;
517    case TASK_TYPE_MERGE:
518        return merge_worker (th_arg);
519        break;
520    default:
521        assert (0);
522        break;
523    }
524}
525
526static void start_thread_pool (
527    tpool_t *tpool, TASK_TYPE_T task_type, thread_arg_t** th_arg_array, int num_workers)
528{
529    thread_func     thread_func;
530
531    switch (task_type) {
532    case TASK_TYPE_MAP:
533        thread_func = map_worker;
534        break;
535    case TASK_TYPE_REDUCE:
536        thread_func = reduce_worker;
537        break;
538    case TASK_TYPE_MERGE:   
539        thread_func = merge_worker;
540        break;
541    default:
542        assert (0);
543        break;
544    }
545
546    CHECK_ERROR (tpool_set (tpool, thread_func, (void **)th_arg_array, num_workers));
547    CHECK_ERROR (tpool_begin (tpool));
548}
549
550/** start_workers()
551 *  thread_func - function pointer to process splitter data
552 *  splitter_func - splitter function pointer
553 *  splitter_init - splitter_init function pointer
554 *  runs map tasks in a new thread on each the available processors.
555 *  returns pointer intermediate value array
556 */
557static void 
558start_workers (mr_env_t* env, thread_arg_t *th_arg)
559{
560    int             thread_index;
561    TASK_TYPE_T     task_type;
562    int             num_threads;
563    int             cpu;
564    intptr_t        ret_val;
565    thread_arg_t    **th_arg_array;
566    void            **rets;
567#ifdef TIMING
568    uint64_t        work_time = 0;
569    uint64_t        user_time = 0;
570    uint64_t        combiner_time = 0;
571#endif
572
573    assert(th_arg != NULL);
574   
575   
576    task_type = th_arg->task_type;
577   
578    //env_print(env);
579    num_threads = getNumTaskThreads (env, task_type);
580
581
582    env->tinfo = (thread_info_t *)mem_calloc (
583        num_threads, sizeof (thread_info_t));
584    th_arg->env = env;
585
586    th_arg_array = (thread_arg_t **)mem_malloc (
587        sizeof (thread_arg_t *) * num_threads);
588    CHECK_ERROR (th_arg_array == NULL);
589
590    for (thread_index = 0; thread_index < num_threads; ++thread_index) {
591
592        cpu = sched_thr_to_cpu (env->schedPolicies[task_type], thread_index + env->args->proc_offset);
593        th_arg->cpu_id = cpu;
594        th_arg->thread_id = thread_index;
595
596        th_arg_array[thread_index] = mem_malloc (sizeof (thread_arg_t));
597        CHECK_ERROR (th_arg_array[thread_index] == NULL);
598        mem_memcpy (th_arg_array[thread_index], th_arg, sizeof (thread_arg_t));
599    }
600
601    start_thread_pool (
602        env->tpool, task_type, &th_arg_array[1], num_threads - 1);
603
604    dprintf("Status: All %d threads have been created\n", num_threads);
605
606    ret_val = (intptr_t)start_my_work (th_arg_array[0]);
607#ifdef TIMING
608    thread_timing_t *timing = (thread_timing_t *)ret_val;
609    work_time += timing->work_time;
610    user_time += timing->user_time;
611    combiner_time += timing->combiner_time;
612    mem_free (timing);
613#endif
614    mem_free (th_arg_array[0]);
615
616    /* Barrier, wait for all threads to finish. */
617    CHECK_ERROR (tpool_wait (env->tpool));
618    rets = tpool_get_results (env->tpool);
619
620    for (thread_index = 1; thread_index < num_threads; ++thread_index)
621    {
622#ifdef TIMING
623        ret_val = (intptr_t)rets[thread_index - 1];
624        thread_timing_t *timing = (thread_timing_t *)ret_val;
625        work_time += timing->work_time;
626        user_time += timing->user_time;
627        combiner_time += timing->combiner_time;
628        mem_free (timing);
629#endif
630        mem_free (th_arg_array[thread_index]);
631    }
632
633    mem_free (th_arg_array);
634    mem_free (rets);
635
636#ifdef TIMING
637    switch (task_type)
638    {
639        case TASK_TYPE_MAP:
640            fprintf (stderr, "map work time: %" PRIu64 "\n",
641                                        work_time / num_threads);
642            fprintf (stderr, "map user time: %" PRIu64 "\n", 
643                                        user_time / num_threads);
644            fprintf (stderr, "map combiner time: %" PRIu64 "\n", 
645                                        combiner_time / num_threads);
646            break;
647
648        case TASK_TYPE_REDUCE:
649            fprintf (stderr, "reduce work time: %" PRIu64 "\n",
650                                        work_time / num_threads);
651            fprintf (stderr, "reduce user time: %" PRIu64 "\n", 
652                                        user_time / num_threads);
653            break;
654
655        case TASK_TYPE_MERGE:
656            fprintf (stderr, "merge work time: %" PRIu64 "\n",
657                                        work_time / num_threads);
658
659        default:
660            break;
661    }
662#endif
663
664    mem_free(env->tinfo);
665    dprintf("Status: All tasks have completed\n"); 
666}
667
668typedef struct {
669    uint64_t            run_time;
670    int                 lgrp;
671} map_worker_task_args_t;
672
673/**
674 * Dequeue the latest task and run it
675 * @return true if ran a task, false otherwise
676 */
677static bool map_worker_do_next_task (
678    mr_env_t *env, int thread_index, map_worker_task_args_t *args)
679{
680    struct timeval  begin, end;
681    int             alloc_len;
682    int             curr_task;
683    task_t          map_task;
684    map_args_t      thread_func_arg;
685    bool            oneOutputQueuePerMapTask;
686    int             lgrp = args->lgrp;
687
688    oneOutputQueuePerMapTask = env->oneOutputQueuePerMapTask;
689
690    alloc_len = env->intermediate_task_alloc_len;
691
692    /* Get new map task. */
693    if (tq_dequeue (env->taskQueue, &map_task, lgrp, thread_index) == 0) {
694        /* no more map tasks */
695        return false;
696    }
697
698    curr_task = env->num_map_tasks++;
699    env->tinfo[thread_index].curr_task = curr_task;
700
701   
702    thread_func_arg.length = map_task.len;
703    thread_func_arg.data = (void *)map_task.data;
704
705 //   dprintf("Task %d: cpu_id -> %x - Started\n", curr_task, (int)env->tinfo[thread_index].tid);
706
707    /* Perform map task. */
708    get_time (&begin);
709    env->map (&thread_func_arg);
710    get_time (&end);
711
712#ifdef TIMING
713    args->run_time = time_diff (&end, &begin);
714#endif
715
716//    dprintf("Task %d: cpu_id -> %x - Done\n", curr_task, (int)env->tinfo[thread_index].tid);
717
718    return true;
719}
720
721/**
722 * map_worker()
723 * args - pointer to thread_arg_t
724 * returns 0 on success
725 * This runs thread_func() until there is no more data from the splitter().
726 * The pointer to results are stored in return_values array.
727 */
728static void *
729map_worker (void *args)
730{
731    assert (args != NULL);
732
733    struct timeval          begin, end;
734    struct timeval          work_begin, work_end;
735    uintptr_t               user_time = 0;
736    thread_arg_t            *th_arg = (thread_arg_t *)args;
737    mr_env_t                *env = th_arg->env;
738    int                     thread_index = th_arg->thread_id;
739    int                     num_assigned = 0;
740    map_worker_task_args_t  mwta;
741#ifdef TIMING
742    uintptr_t               work_time = 0;
743    uintptr_t               combiner_time = 0;
744#endif
745
746    env->tinfo[thread_index].tid = pthread_self();
747
748    /* Bind thread. */
749    CHECK_ERROR (proc_bind_thread (th_arg->cpu_id) != 0);
750
751    CHECK_ERROR (pthread_setspecific (env_key, env));
752#ifdef TIMING
753    CHECK_ERROR (pthread_setspecific (emit_time_key, 0));
754#endif
755
756    mwta.lgrp = loc_get_lgrp();
757
758    get_time (&work_begin);
759    while (map_worker_do_next_task (env, thread_index, &mwta)) {
760        user_time += mwta.run_time;
761        num_assigned++;
762    }
763    get_time (&work_end);
764
765#ifdef TIMING
766    work_time = time_diff (&work_end, &work_begin);
767#endif
768
769    get_time (&begin);
770
771    /* Apply combiner to local map results. */
772#ifndef INCREMENTAL_COMBINER
773    if (env->combiner != NULL)
774        run_combiner (env, thread_index);
775#endif
776
777    get_time (&end);
778
779#ifdef TIMING
780    combiner_time = time_diff (&end, &begin);
781#endif
782
783    dprintf("Status: Total of %d tasks were assigned to cpu_id %d\n", 
784        num_assigned, th_arg->cpu_id);
785
786    /* Unbind thread. */
787    CHECK_ERROR (proc_unbind_thread () != 0);
788
789#ifdef TIMING
790    thread_timing_t *timing = calloc (1, sizeof (thread_timing_t));
791    uintptr_t emit_time = (uintptr_t)pthread_getspecific (emit_time_key);
792    timing->user_time = user_time - emit_time;
793    timing->work_time = work_time - timing->user_time;
794    timing->combiner_time = combiner_time;
795    return (void *)timing;
796#else
797    return (void *)0;
798#endif
799}
800
801typedef struct {
802    struct iterator_t   itr;
803    uint64_t            run_time;
804    int                 num_map_threads;
805    int                 lgrp;
806} reduce_worker_task_args_t;
807
808/**
809 * Dequeue next reduce task and do it
810 * TODO Refactor this even more. It is still gross.
811 * @return true if did work, false otherwise
812 */
813static bool reduce_worker_do_next_task (
814    mr_env_t *env, int thread_index, reduce_worker_task_args_t *args)
815{
816    struct timeval  begin, end;
817    intptr_t        curr_reduce_task = 0;
818    keyvals_t       *min_key_val, *next_min;
819    task_t          reduce_task;
820    int             num_map_threads;
821    int             curr_thread;
822    int             lgrp = args->lgrp;
823
824    int             advise_pos;
825
826    /* Get the next reduce task. */
827    if (tq_dequeue (env->taskQueue, &reduce_task, lgrp, thread_index) == 0) {
828        /* No more reduce tasks. */
829        return false;
830    }
831
832    curr_reduce_task = (intptr_t)reduce_task.id;
833
834    env->tinfo[thread_index].curr_task = curr_reduce_task;
835
836    num_map_threads =  args->num_map_threads;
837
838    args->run_time = 0;
839    min_key_val = NULL;
840    next_min = NULL;
841
842    do {
843        for (curr_thread = 0; curr_thread < num_map_threads; curr_thread++) {
844            keyvals_t       *curr_key_val;
845            keyvals_arr_t   *thread_array;
846
847            /* Find the next array to search. */
848            thread_array = 
849                &env->intermediate_vals[curr_thread][curr_reduce_task];
850
851            /* Check if the current processor array has been
852               completely searched. */
853            if (thread_array->pos >= thread_array->len) {
854                continue;
855            }
856
857            /* Get the next key in the processor array. */
858            curr_key_val = &thread_array->arr[thread_array->pos];
859
860
861            /* If the key matches the minimum value,
862               add the value to the list of values for that key. */
863            if (min_key_val != NULL &&
864                !env->key_cmp(curr_key_val->key, min_key_val->key)) {
865                CHECK_ERROR (iter_add (&args->itr, curr_key_val));
866                thread_array->pos += 1;
867                --curr_thread;
868            }
869            /* Find the location of the next min. */
870            else if (next_min == NULL || 
871                env->key_cmp(curr_key_val->key, next_min->key) < 0)
872            {
873                next_min = curr_key_val;
874            }
875        }
876
877        if (min_key_val != NULL) {
878            keyvals_t       *curr_key_val;
879
880            if (env->reduce != identity_reduce) {
881                get_time (&begin);
882                env->reduce (min_key_val->key, &args->itr);
883                get_time (&end);
884#ifdef TIMING
885                args->run_time += time_diff (&end, &begin);
886#endif
887            } else {
888                env->reduce (min_key_val->key, &args->itr);
889            }
890
891            /* Free up memory */
892            iter_rewind (&args->itr);
893            while (iter_next_list (&args->itr, &curr_key_val)) {
894                val_t   *vals, *next;
895
896                vals = curr_key_val->vals;
897                while (vals != NULL) {
898                    next = vals->next_val;
899                    mem_free (vals);
900                    vals = next;
901                }
902            }
903
904            iter_reset(&args->itr);
905        }
906
907        min_key_val = next_min;
908        next_min = NULL;
909
910        /* See if there are any elements left. */
911        for(curr_thread = 0;
912            curr_thread < num_map_threads &&
913            env->intermediate_vals[curr_thread][curr_reduce_task].pos >=
914            env->intermediate_vals[curr_thread][curr_reduce_task].len;
915            curr_thread++);
916    } while (curr_thread != num_map_threads);
917
918    /* Free up the memory. */
919    for (curr_thread = 0; curr_thread < num_map_threads; curr_thread++) {
920        keyvals_arr_t   *arr;
921
922        arr = &env->intermediate_vals[curr_thread][curr_reduce_task];
923        if (arr->alloc_len != 0)
924            mem_free(arr->arr);
925    }
926
927    return true;
928}
929
930static void *
931reduce_worker (void *args)
932{
933    assert(args != NULL);
934
935    struct timeval              work_begin, work_end;
936    uintptr_t                   user_time = 0;
937    thread_arg_t                *th_arg = (thread_arg_t *)args;
938    int                         thread_index = th_arg->thread_id;
939    mr_env_t                    *env = th_arg->env;
940    reduce_worker_task_args_t   rwta;
941    int                         num_map_threads;
942#ifdef TIMING
943    uintptr_t                   work_time = 0;
944#endif
945
946    env->tinfo[thread_index].tid = pthread_self();
947
948    /* Bind thread. */
949    CHECK_ERROR (proc_bind_thread (th_arg->cpu_id) != 0);
950
951    CHECK_ERROR (pthread_setspecific (env_key, env));
952#ifdef TIMING
953    CHECK_ERROR (pthread_setspecific (emit_time_key, 0));
954#endif
955
956    if (env->oneOutputQueuePerMapTask)
957        num_map_threads = env->num_map_tasks;
958    else
959        num_map_threads = env->num_map_threads;
960
961    /* Assuming !oneOutputQueuePerMapTask */
962    CHECK_ERROR (iter_init (&rwta.itr, env->num_map_threads));
963    rwta.num_map_threads = num_map_threads;
964    rwta.lgrp = loc_get_lgrp();
965
966    get_time (&work_begin);
967
968    while (reduce_worker_do_next_task (env, thread_index, &rwta)) {
969        user_time += rwta.run_time;
970    }
971
972    get_time (&work_end);
973
974#ifdef TIMING
975    work_time = time_diff (&work_end, &work_begin);
976#endif
977
978    iter_finalize (&rwta.itr);
979
980    /* Unbind thread. */
981    CHECK_ERROR (proc_unbind_thread () != 0);
982
983#ifdef TIMING
984    thread_timing_t *timing = calloc (1, sizeof (thread_timing_t));
985    uintptr_t emit_time = (uintptr_t)pthread_getspecific (emit_time_key);
986    timing->user_time = user_time - emit_time;
987    timing->work_time = work_time - timing->user_time;
988    return (void *)timing;
989#else
990    return (void *)0;
991#endif
992}
993
994/** merge_worker()
995* args - pointer to thread_arg_t
996* returns 0 on success
997*/
998static void *
999merge_worker (void *args) 
1000{
1001    assert(args != NULL);
1002
1003    struct timeval  work_begin, work_end;
1004    thread_arg_t    *th_arg = (thread_arg_t *)args;
1005    int             thread_index = th_arg->thread_id;
1006    mr_env_t        *env = th_arg->env;
1007    int             cpu;
1008#ifdef TIMING
1009    uintptr_t       work_time = 0;
1010#endif
1011
1012    env->tinfo[thread_index].tid = pthread_self();
1013
1014    /* Bind thread.
1015       Spread out the merge workers as much as possible. */
1016    if (env->oneOutputQueuePerReduceTask)
1017        cpu = th_arg->cpu_id * (1 << (th_arg->merge_round - 1));
1018    else
1019        cpu = th_arg->cpu_id * (1 << th_arg->merge_round);
1020
1021    CHECK_ERROR (proc_bind_thread (cpu) != 0);
1022
1023    CHECK_ERROR (pthread_setspecific (env_key, env));
1024
1025    /* Assumes num_merge_threads is modified before each call. */
1026    int length = th_arg->merge_len / env->num_merge_threads;
1027    int modlen = th_arg->merge_len % env->num_merge_threads;
1028
1029    /* Let's make some progress here. */
1030    if (length <= 1) {
1031        length = 2;
1032        modlen = th_arg->merge_len % 2;
1033    }
1034
1035    int pos = thread_index * length + 
1036                ((thread_index < modlen) ? thread_index : modlen);
1037
1038    if (pos < th_arg->merge_len) {
1039
1040        keyval_arr_t *vals = &th_arg->merge_input[pos];
1041
1042//        dprintf("Thread %d: cpu_id -> %d - Started\n",
1043  //                  thread_index, th_arg->cpu_id);
1044
1045        get_time (&work_begin);
1046        merge_results (th_arg->env, vals, length + (thread_index < modlen));
1047        get_time (&work_end);
1048
1049#ifdef TIMING
1050        work_time = time_diff (&work_end, &work_begin);
1051#endif
1052
1053//        dprintf("Thread %d: cpu_id -> %d - Done\n",
1054//                    thread_index, th_arg->cpu_id);
1055    }
1056
1057    /* Unbind thread. */
1058    CHECK_ERROR (proc_unbind_thread () != 0);
1059
1060#ifdef TIMING
1061    thread_timing_t *timing = calloc (1, sizeof (thread_timing_t));
1062    timing->work_time = work_time;
1063    return (void *)timing;
1064#else
1065    return (void *)0;
1066#endif
1067}
1068
1069/**
1070 * Split phase of map task generation, creates all tasks and throws in single
1071 * queue.
1072 *
1073 * @param q     queue to place tasks into
1074 * @return number of tasks generated, or 0 on error
1075 */
1076static int gen_map_tasks_split (mr_env_t* env, queue_t* q)
1077{
1078    int                 cur_task_id;
1079    map_args_t          args;
1080    task_queued         *task = NULL;
1081
1082    /* split until complete */
1083    cur_task_id = 0;
1084    while (env->splitter (env->args->task_data, env->chunk_size, &args))
1085    {
1086        task = (task_queued *)mem_malloc (sizeof (task_queued));
1087        task->task.id = cur_task_id;
1088        task->task.len = (uint32_t)args.length;
1089        task->task.data = (uint32_t)args.data;
1090        //madvise(args.data,args.length,MADV_MIGRATE);
1091        queue_push_back (q, &task->queue_elem);
1092
1093        ++cur_task_id;
1094    }
1095
1096    if (task == NULL) {
1097        /* not enough memory, undo what's been done, error out */
1098        queue_elem_t    *queue_elem;
1099
1100        while (queue_pop_front (q, &queue_elem))
1101        {
1102            task = queue_entry (queue_elem, task_queued, queue_elem);
1103            assert (task != NULL);
1104            mem_free (task);
1105        }
1106
1107        return 0;
1108    }
1109
1110    return cur_task_id;
1111}
1112
1113/**
1114 * User provided own splitter function but did not supply a locator function.
1115 * Nothing to do here about locality, so just try to put consecutive tasks
1116 * in the same task queue.
1117 */
1118static int gen_map_tasks_distribute_lgrp (
1119    mr_env_t* env, int num_map_tasks, queue_t* q)
1120{
1121    queue_elem_t    *queue_elem;
1122    int             tasks_per_lgrp;
1123    int             tasks_leftover;
1124    int             num_lgrps;
1125    int             lgrp;
1126
1127    num_lgrps = env->num_map_threads / loc_get_lgrp_size();
1128    if (num_lgrps == 0) num_lgrps = 1;
1129
1130    tasks_per_lgrp = num_map_tasks / num_lgrps;
1131    tasks_leftover = num_map_tasks - tasks_per_lgrp * num_lgrps;
1132
1133    /* distribute tasks across locality groups */
1134    for (lgrp = 0; lgrp < num_lgrps; ++lgrp)
1135    {
1136        int remaining_cur_lgrp_tasks;
1137
1138        remaining_cur_lgrp_tasks = tasks_per_lgrp;
1139        if (tasks_leftover > 0) {
1140            remaining_cur_lgrp_tasks++;
1141            tasks_leftover--;
1142        }
1143        do {
1144            task_queued *task;
1145
1146            if (queue_pop_front (q, &queue_elem) == 0) {
1147                /* queue is empty, everything is distributed */
1148                break;
1149            }
1150
1151            task = queue_entry (queue_elem, task_queued, queue_elem);
1152            assert (task != NULL);
1153
1154            if (tq_enqueue_seq (env->taskQueue, &task->task, lgrp) < 0) {
1155                mem_free (task);
1156                return -1;
1157            }
1158
1159            mem_free (task);
1160            remaining_cur_lgrp_tasks--;
1161        } while (remaining_cur_lgrp_tasks);
1162
1163        if (remaining_cur_lgrp_tasks != 0) {
1164            break;
1165        }
1166    }
1167
1168    return 0;
1169}
1170
1171/**
1172 * We can try to queue tasks based on locality info
1173 */
1174static int gen_map_tasks_distribute_locator (
1175    mr_env_t* env, int num_map_tasks, queue_t* q)
1176{
1177    queue_elem_t    *queue_elem;
1178
1179    while (queue_pop_front (q, &queue_elem))
1180    {
1181        task_queued *task;
1182        int         lgrp;
1183        map_args_t  args;
1184
1185        task = queue_entry (queue_elem, task_queued, queue_elem);
1186        assert (task != NULL);
1187
1188        args.length = task->task.len;
1189        args.data = (void*)task->task.data;
1190
1191        if (env->locator != NULL) {
1192            void    *addr;
1193            addr = env->locator (&args);
1194            lgrp = loc_mem_to_lgrp (addr);
1195        } else {
1196            lgrp = loc_mem_to_lgrp (args.data);
1197        }
1198
1199        task->task.v[3] = lgrp;         /* For debugging. */
1200        if (tq_enqueue_seq (env->taskQueue, &task->task, lgrp) != 0) {
1201            mem_free (task);
1202            return -1;
1203        }
1204
1205        mem_free (task);
1206    }
1207
1208    return 0;
1209}
1210
1211/**
1212 * Distributes tasks to threads
1213 * @param num_map_tasks number of map tasks in queue
1214 * @param q queue of tasks to distribute
1215 * @return 0 on success, less than 0 on failure
1216 */
1217static int gen_map_tasks_distribute (
1218    mr_env_t* env, int num_map_tasks, queue_t* q)
1219{
1220    if ((env->splitter != array_splitter) && 
1221        (env->locator == NULL)) {
1222        return gen_map_tasks_distribute_lgrp (env, num_map_tasks, q);
1223    } else {
1224        return gen_map_tasks_distribute_locator (env, num_map_tasks, q);
1225    }
1226
1227    return 0;
1228}
1229
1230/**
1231 * Generate all map tasks and queue them up
1232 * @return number of map tasks created if successful, negative value on error
1233 */
1234static int gen_map_tasks (mr_env_t* env)
1235{
1236    int             ret;
1237    int             num_map_tasks;
1238    queue_t         temp_queue;
1239    int             num_map_threads;
1240
1241    queue_init (&temp_queue);
1242
1243    num_map_tasks = gen_map_tasks_split (env, &temp_queue);
1244    if (num_map_tasks <= 0) {
1245        return -1;
1246    }
1247
1248    num_map_threads = env->num_map_threads;
1249    if (num_map_tasks < num_map_threads)
1250        num_map_threads = num_map_tasks;
1251    tq_reset (env->taskQueue, num_map_threads);
1252
1253    ret = gen_map_tasks_distribute (env, num_map_tasks, &temp_queue);
1254    if (ret == 0) ret = num_map_tasks;
1255
1256    return num_map_tasks;
1257}
1258
1259static int gen_reduce_tasks (mr_env_t* env)
1260{
1261    int ret, tid;
1262    int tasks_per_thread;
1263    int tasks_leftover;
1264    int task_id;
1265    task_t reduce_task;
1266
1267    tq_reset (env->taskQueue, env->num_reduce_threads);
1268
1269    tasks_per_thread = env->num_reduce_tasks / env->num_reduce_threads;
1270    tasks_leftover = env->num_reduce_tasks - 
1271        tasks_per_thread * env->num_map_threads;
1272
1273    task_id = 0;
1274    for (tid = 0; tid < env->num_reduce_threads; ++tid) {
1275        int remaining_cur_thread_tasks;
1276
1277        remaining_cur_thread_tasks = tasks_per_thread;
1278        if (tasks_leftover > 0) {
1279            remaining_cur_thread_tasks += 1;
1280            --tasks_leftover;
1281        }
1282        do {
1283            if (task_id == env->num_reduce_tasks) {
1284                return 0;
1285            }
1286
1287            /* New task. */
1288            reduce_task.id = task_id;
1289
1290            /* TODO: Implement locality optimization. */
1291            //madvise((void*)reduce_task.data,reduce_task.len,MADV_MIGRATE);
1292           
1293            ret = tq_enqueue_seq (env->taskQueue, &reduce_task,  -1);
1294            if (ret < 0) {
1295                return -1;
1296            }
1297            ++task_id;
1298            --remaining_cur_thread_tasks;
1299        } while (remaining_cur_thread_tasks);
1300    }
1301
1302    return 0;
1303}
1304
1305#ifndef INCREMENTAL_COMBINER
1306static void run_combiner (mr_env_t* env, int thread_index)
1307{
1308    assert (! env->oneOutputQueuePerMapTask);
1309
1310    int i, j;
1311    keyvals_arr_t *my_output;
1312    keyvals_t *reduce_pos;
1313    void *reduced_val;
1314    iterator_t itr;
1315    val_t *val, *next;
1316
1317    CHECK_ERROR (iter_init (&itr, 1));
1318
1319    for (i = 0; i < env->num_reduce_tasks; ++i)
1320    {
1321        my_output = &env->intermediate_vals[thread_index][i];
1322        for (j = 0; j < my_output->len; ++j)
1323        {
1324            reduce_pos = &(my_output->arr[j]);
1325            if (reduce_pos->len == 0) continue;
1326
1327            CHECK_ERROR (iter_add (&itr, reduce_pos));
1328
1329            reduced_val = env->combiner (&itr);
1330
1331            /* Shed off trailing chunks. */
1332            assert (reduce_pos->vals);
1333            val = reduce_pos->vals->next_val;
1334            while (val)
1335            {
1336                next = val->next_val;
1337                mem_free (val);
1338                val = next;
1339            }
1340
1341            /* Update the entry. */
1342            val = reduce_pos->vals;
1343            val->next_insert_pos = 0;
1344            val->next_val = NULL;
1345            val->array[val->next_insert_pos++] = reduced_val;
1346            reduce_pos->len = 1;
1347
1348            iter_reset (&itr);
1349        }
1350    }
1351
1352    iter_finalize (&itr);
1353}
1354#endif
1355
1356/** emit_intermediate()
1357 *  inserts the key, val pair into the intermediate array
1358 */
1359void 
1360emit_intermediate (void *key, void *val, int key_size)
1361{
1362    struct timeval  begin, end;
1363    //static __thread int curr_thread = -1;
1364    int             curr_thread;
1365    int             curr_task;
1366    bool            oneOutputQueuePerMapTask;
1367    keyvals_arr_t   *arr;
1368    mr_env_t        *env;
1369
1370    get_time (&begin);
1371
1372    env = get_env();
1373
1374    curr_thread = getCurrThreadIndex (TASK_TYPE_MAP);
1375   
1376    oneOutputQueuePerMapTask = env->oneOutputQueuePerMapTask;
1377
1378    if (oneOutputQueuePerMapTask)
1379        curr_task = env->tinfo[curr_thread].curr_task;
1380    else
1381        curr_task = curr_thread;
1382   
1383    int reduce_pos = env->partition (env->num_reduce_tasks, key, key_size);
1384    reduce_pos %= env->num_reduce_tasks;
1385
1386    /* Insert sorted in global queue at pos curr_proc */
1387    arr = &env->intermediate_vals[curr_task][reduce_pos];
1388
1389    insert_keyval_merged (env, arr, key, val);
1390
1391    get_time (&end);
1392
1393#ifdef TIMING
1394    uintptr_t total_emit_time = (uintptr_t)pthread_getspecific (emit_time_key);
1395    uintptr_t emit_time = time_diff (&end, &begin);
1396    total_emit_time += emit_time;
1397    CHECK_ERROR (pthread_setspecific (emit_time_key, (void *)total_emit_time));
1398#endif
1399}
1400
1401/** emit_inline ()
1402 *  inserts the key, val pair into the final output array
1403 */
1404static inline void 
1405emit_inline (mr_env_t* env, void *key, void *val)
1406{
1407    keyval_arr_t    *arr;
1408    int             curr_red_queue;
1409    //static __thread int thread_index = -1;
1410    int             thread_index;
1411
1412    thread_index = getCurrThreadIndex (TASK_TYPE_REDUCE);
1413
1414    if (env->oneOutputQueuePerReduceTask) {
1415        curr_red_queue = env->tinfo[thread_index].curr_task;
1416    }
1417    else {
1418        curr_red_queue = thread_index;
1419    }
1420
1421    /* Insert sorted in global queue at pos curr_proc */
1422    arr = &env->final_vals[curr_red_queue];
1423    insert_keyval (env, arr, key, val);
1424}
1425
1426/** emit ()
1427 */
1428void
1429emit (void *key, void *val)
1430{
1431    struct timeval begin, end;
1432
1433    get_time (&begin);
1434
1435    emit_inline (get_env(), key, val);
1436
1437    get_time (&end);
1438
1439#ifdef TIMING
1440    uintptr_t total_emit_time = (uintptr_t)pthread_getspecific (emit_time_key);
1441    uintptr_t emit_time = time_diff (&end, &begin);
1442    total_emit_time += emit_time;
1443    CHECK_ERROR (pthread_setspecific (emit_time_key, (void *)total_emit_time));
1444#endif
1445}
1446
1447static inline void 
1448insert_keyval_merged (mr_env_t* env, keyvals_arr_t *arr, void *key, void *val)
1449{
1450    int high = arr->len, low = -1, next;
1451    int cmp = 1;
1452    keyvals_t *insert_pos;
1453    val_t *new_vals;
1454
1455    assert(arr->len <= arr->alloc_len);
1456    if (arr->len > 0)
1457        cmp = env->key_cmp(arr->arr[arr->len - 1].key, key);
1458
1459    if (cmp > 0)
1460    {
1461        /* Binary search the array to find the key. */
1462        while (high - low > 1)
1463        {
1464            next = (high + low) / 2;
1465            if (env->key_cmp(arr->arr[next].key, key) > 0)
1466                high = next;
1467            else
1468                low = next;
1469        }
1470
1471        if (low < 0) low = 0;
1472        if (arr->len > 0 &&
1473                (cmp = env->key_cmp(arr->arr[low].key, key)) < 0)
1474            low++;
1475    }
1476    else if (cmp < 0)
1477        low = arr->len;
1478    else
1479        low = arr->len-1;
1480
1481    if (arr->len == 0 || cmp)
1482    {
1483        /* if array is full, double and copy over. */
1484        if (arr->len == arr->alloc_len)
1485        {
1486            if (arr->alloc_len == 0)
1487            {
1488                arr->alloc_len = DEFAULT_KEYVAL_ARR_LEN;
1489                arr->arr = (keyvals_t *)
1490                    mem_malloc (arr->alloc_len * sizeof (keyvals_t));
1491            }
1492            else
1493            {
1494                arr->alloc_len *= 2;
1495                arr->arr = (keyvals_t *)
1496                    mem_realloc (arr->arr, arr->alloc_len * sizeof (keyvals_t));
1497            }
1498        }
1499
1500        /* Insert into array. */
1501        memmove (&arr->arr[low+1], &arr->arr[low], 
1502                        (arr->len - low) * sizeof(keyvals_t));
1503
1504        arr->arr[low].key = key;
1505        arr->arr[low].len = 0;
1506        arr->arr[low].vals = NULL;
1507        arr->len++;
1508    }
1509
1510    insert_pos = &(arr->arr[low]);
1511
1512    if (insert_pos->vals == NULL)
1513    {
1514        /* Allocate a chunk for the first time. */
1515        new_vals = mem_malloc
1516            (sizeof (val_t) + DEFAULT_VALS_ARR_LEN * sizeof (void *));
1517        assert (new_vals);
1518
1519        new_vals->size = DEFAULT_VALS_ARR_LEN;
1520        new_vals->next_insert_pos = 0;
1521        new_vals->next_val = NULL;
1522
1523        insert_pos->vals = new_vals;
1524    }
1525    else if (insert_pos->vals->next_insert_pos >= insert_pos->vals->size)
1526    {
1527#ifdef INCREMENTAL_COMBINER
1528        if (env->combiner != NULL) {
1529            iterator_t itr;
1530            void *reduced_val;
1531
1532            CHECK_ERROR (iter_init (&itr, 1));
1533            CHECK_ERROR (iter_add (&itr, insert_pos));
1534
1535            reduced_val = env->combiner (&itr);
1536
1537            insert_pos->vals->array[0] = reduced_val;
1538            insert_pos->vals->next_insert_pos = 1;
1539            insert_pos->len = 1;
1540
1541            iter_finalize (&itr);
1542        } else {
1543#endif
1544            /* Need a new chunk. */
1545            int alloc_size;
1546
1547            alloc_size = insert_pos->vals->size * 2;
1548            new_vals = mem_malloc (sizeof (val_t) + alloc_size * sizeof (void *));
1549            assert (new_vals);
1550
1551            new_vals->size = alloc_size;
1552            new_vals->next_insert_pos = 0;
1553            new_vals->next_val = insert_pos->vals;
1554
1555            insert_pos->vals = new_vals;
1556#ifdef INCREMENTAL_COMBINER
1557        }
1558#endif
1559    }
1560
1561    insert_pos->vals->array[insert_pos->vals->next_insert_pos++] = val;
1562
1563    insert_pos->len += 1;
1564}
1565
1566static inline void 
1567insert_keyval (mr_env_t* env, keyval_arr_t *arr, void *key, void *val)
1568{
1569    int high = arr->len, low = -1, next;
1570    int cmp = 1;
1571
1572    assert(arr->len <= arr->alloc_len);
1573
1574    /* If array is full, double and copy over. */
1575    if (arr->len == arr->alloc_len)
1576    {
1577        if (arr->alloc_len == 0)
1578        {
1579            arr->alloc_len = DEFAULT_KEYVAL_ARR_LEN;
1580            arr->arr = (keyval_t*)mem_malloc(arr->alloc_len * sizeof(keyval_t));
1581        }
1582        else
1583        {
1584            arr->alloc_len *= 2;
1585            arr->arr = (keyval_t*)mem_realloc(arr->arr, arr->alloc_len * sizeof(keyval_t));
1586        }
1587    }
1588
1589    if (env->oneOutputQueuePerReduceTask == false)
1590    {
1591        /* Need to sort. */
1592        if (arr->len > 0)
1593            cmp = env->key_cmp(arr->arr[arr->len - 1].key, key);
1594
1595        if (cmp > 0)
1596        {
1597            /* Binary search the array to find the key. */
1598            while (high - low > 1)
1599            {
1600                next = (high + low) / 2;
1601                if (env->key_cmp(arr->arr[next].key, key) > 0)
1602                    high = next;
1603                else
1604                    low = next;
1605            }
1606
1607            if (low < 0) low = 0;
1608            if (arr->len > 0 && env->key_cmp(arr->arr[low].key, key) < 0)
1609                low++;
1610        }
1611        else
1612            low = arr->len;
1613
1614
1615        /* Insert into array. */
1616        memmove (&arr->arr[low+1], &arr->arr[low], 
1617            (arr->len - low) * sizeof(keyval_t));
1618
1619        arr->arr[low].key = key;
1620        arr->arr[low].val = val;
1621    }
1622    else
1623    {
1624        /* No need to sort. Just append. */
1625        arr->arr[arr->len].key = key;
1626        arr->arr[arr->len].val = val;
1627    }
1628
1629    arr->len++;
1630}
1631
1632static inline void 
1633merge_results (mr_env_t* env, keyval_arr_t *vals, int length) 
1634{
1635    int data_idx;
1636    int total_num_keys = 0;
1637    int i;
1638    //static __thread int curr_thread = -1;
1639    int curr_thread;
1640
1641    curr_thread = getCurrThreadIndex (TASK_TYPE_MERGE);
1642
1643    for (i = 0; i < length; i++) {
1644        total_num_keys += vals[i].len;
1645    }
1646
1647    env->merge_vals[curr_thread].len = total_num_keys;
1648    env->merge_vals[curr_thread].alloc_len = total_num_keys;
1649    env->merge_vals[curr_thread].pos = 0;
1650    env->merge_vals[curr_thread].arr = (keyval_t *)
1651      mem_malloc(sizeof(keyval_t) * total_num_keys);
1652
1653    for (data_idx = 0; data_idx < total_num_keys; data_idx++) {
1654        /* For each keyval_t. */
1655        int         min_idx;
1656        keyval_t    *min_keyval;
1657
1658        for (i = 0; i < length && vals[i].pos >= vals[i].len; i++);
1659
1660        if (i == length) break;
1661
1662        /* Determine the minimum key. */
1663        min_idx = i;
1664        min_keyval = &vals[i].arr[vals[i].pos];
1665
1666        for (i++; i < length; i++) {
1667            if (vals[i].pos < vals[i].len)
1668            {
1669                int cmp_ret;
1670                cmp_ret = env->key_cmp(
1671                    min_keyval->key, 
1672                    vals[i].arr[vals[i].pos].key);
1673
1674                if (cmp_ret > 0) {
1675                    min_idx = i;
1676                    min_keyval = &vals[i].arr[vals[i].pos];
1677                }
1678            }
1679        }
1680
1681        mem_memcpy (&env->merge_vals[curr_thread].arr[data_idx], 
1682                        min_keyval, sizeof(keyval_t));
1683        vals[min_idx].pos += 1;
1684    }
1685
1686    for (i = 0; i < length; i++) {
1687        mem_free(vals[i].arr);
1688    }
1689}
1690
1691static inline int 
1692getNumTaskThreads (mr_env_t* env, TASK_TYPE_T task_type)
1693{
1694    int num_threads;
1695    //env_print(env);
1696    switch (task_type)
1697    {
1698        case TASK_TYPE_MAP:
1699            num_threads = env->num_map_threads;
1700            break;
1701
1702        case TASK_TYPE_REDUCE:
1703            num_threads = env->num_reduce_threads;
1704            break;
1705
1706        case TASK_TYPE_MERGE:
1707            num_threads = env->num_merge_threads;
1708            break;
1709
1710        default:
1711            assert (0);
1712            num_threads = env->num_map_threads;
1713            break;
1714    }
1715
1716    return num_threads;
1717}
1718
1719/** getCurrThreadIndex()
1720 *  Returns the processor the current thread is running on
1721 */
1722static inline int 
1723getCurrThreadIndex (TASK_TYPE_T task_type)
1724{
1725    uint32_t       i;
1726    const uint32_t flag = 0x80000000;
1727    uint32_t       num_threads;
1728    mr_env_t       *env;
1729    pthread_t      mytid;
1730    void           *val;
1731
1732    val = pthread_getspecific(thread_index_key);
1733
1734    if(val == NULL) 
1735    {
1736      env = get_env();
1737      num_threads = getNumTaskThreads (env, task_type);
1738      mytid = pthread_self();
1739     
1740      for (i = 0; i < num_threads && env->tinfo[i].tid != mytid; i++);
1741
1742      assert(i != num_threads);
1743      val = (void*)(i |= flag);
1744
1745      CHECK_ERROR(pthread_setspecific(thread_index_key, val));
1746    }
1747    else 
1748    {
1749      i = (uint32_t)val & ~flag;
1750    }
1751
1752    return i;
1753}
1754
1755/** array_splitter()
1756 *
1757 */
1758int 
1759array_splitter (void *data_in, int req_units, map_args_t *out)
1760{
1761    assert(out != NULL);
1762
1763    mr_env_t    *env;
1764    int         unit_size;
1765    int         data_units;
1766
1767    env = get_env();
1768    unit_size = env->args->unit_size;
1769    data_units = env->args->data_size / unit_size;
1770
1771    /* End of data reached, return FALSE. */
1772    if (env->splitter_pos >= data_units)
1773        return 0;
1774
1775    /* Set the start of the next data. */
1776    out->data = ((void *)env->args->task_data) + env->splitter_pos*unit_size;
1777
1778    /* Determine the nominal length. */
1779    if (env->splitter_pos + req_units > data_units)
1780        out->length = data_units - env->splitter_pos;
1781    else
1782        out->length = req_units;
1783
1784    env->splitter_pos += out->length;
1785
1786    /* Return true since the out data is valid. */
1787    return 1;
1788}
1789
1790void 
1791identity_reduce (void *key, iterator_t *itr)
1792{
1793    void        *val;
1794    mr_env_t    *env;
1795    env = get_env();
1796    while (iter_next (itr, &val))
1797    {
1798        emit_inline (env, key, val);
1799    }
1800    //printf("reduce done\n");
1801}
1802
1803int 
1804default_partition (int num_reduce_tasks, void* key, int key_size)
1805{
1806    unsigned long hash = 5381;
1807    char *str = (char *)key;
1808    int i;
1809
1810    for (i = 0; i < key_size; i++)
1811    {
1812        hash = ((hash << 5) + hash) + ((int)str[i]); /* hash * 33 + c */
1813    }
1814
1815    return hash % num_reduce_tasks;
1816}
1817
1818/**
1819 * Run map tasks and get intermediate values
1820 */
1821static void map (mr_env_t* env)
1822{
1823    thread_arg_t   th_arg;
1824    int            num_map_tasks;
1825
1826    num_map_tasks = gen_map_tasks (env);
1827    assert (num_map_tasks >= 0);
1828   
1829    printf("Number of Task : %d\n", num_map_tasks);
1830
1831    env->num_map_tasks = num_map_tasks;
1832    if (num_map_tasks < env->num_map_threads)
1833        env->num_map_threads = num_map_tasks;
1834
1835    //printf (OUT_PREFIX "num_map_tasks = %d\n", env->num_map_tasks);
1836
1837    mem_memset (&th_arg, 0, sizeof(thread_arg_t));
1838    th_arg.task_type = TASK_TYPE_MAP;
1839
1840    start_workers (env, &th_arg);
1841}
1842
1843/**
1844 * Run reduce tasks and get final values.
1845 */
1846static void reduce (mr_env_t* env)
1847{
1848    int            i;
1849    thread_arg_t   th_arg;
1850
1851    CHECK_ERROR (gen_reduce_tasks (env));
1852
1853    mem_memset (&th_arg, 0, sizeof(thread_arg_t));
1854    th_arg.task_type = TASK_TYPE_REDUCE;
1855
1856    start_workers (env, &th_arg);
1857
1858    /* Cleanup intermediate results. */
1859    for (i = 0; i < env->intermediate_task_alloc_len; ++i)
1860    {
1861        mem_free (env->intermediate_vals[i]);
1862    }
1863    mem_free (env->intermediate_vals);
1864}
1865
1866/**
1867 * Merge all reduced data
1868 */
1869static void merge (mr_env_t* env)
1870{
1871    thread_arg_t   th_arg;
1872
1873    mem_memset (&th_arg, 0, sizeof (thread_arg_t));
1874    th_arg.task_type = TASK_TYPE_MERGE;
1875
1876    if (env->oneOutputQueuePerReduceTask) {
1877        th_arg.merge_len = env->num_reduce_tasks;
1878    } else {
1879        th_arg.merge_len = env->num_reduce_threads;
1880    }
1881    th_arg.merge_input = env->final_vals;
1882
1883    if (th_arg.merge_len <= 1) {
1884        /* Already merged, nothing to do here */
1885        env->args->result->data = env->final_vals->arr;
1886        env->args->result->length = env->final_vals->len;
1887
1888        mem_free(env->final_vals);
1889
1890        return;
1891    }
1892
1893    /* have work to merge! */
1894    while (th_arg.merge_len > 1) {
1895        th_arg.merge_round += 1;
1896
1897        /* This is the worst case length,
1898           depending on the value of num_merge_threads. */
1899        env->merge_vals = (keyval_arr_t*) 
1900            mem_malloc (env->num_merge_threads * sizeof(keyval_arr_t));
1901
1902        /* Run merge tasks and get merge values. */
1903        start_workers (env, &th_arg);
1904
1905        mem_free (th_arg.merge_input);
1906        th_arg.merge_len = env->num_merge_threads;
1907
1908        env->num_merge_threads /= 2;
1909        if (env->num_merge_threads == 0)
1910            env->num_merge_threads = 1;
1911
1912        th_arg.merge_input = env->merge_vals;
1913    }
1914
1915    env->args->result->data = env->merge_vals[0].arr;
1916    env->args->result->length = env->merge_vals[0].len;
1917
1918    mem_free(env->merge_vals);
1919}
1920
1921static inline mr_env_t* get_env (void)
1922{
1923    return (mr_env_t*)pthread_getspecific (env_key);
1924}
Note: See TracBrowser for help on using the repository browser.