source: trunk/sys/libphoenix/include/map_reduce.h @ 87

Last change on this file since 87 was 1, checked in by alain, 8 years ago

First import

File size: 8.2 KB
RevLine 
[1]1/* Copyright (c) 2007-2009, Stanford University
2* All rights reserved.
3*
4* Redistribution and use in source and binary forms, with or without
5* modification, are permitted provided that the following conditions are met:
6*     * Redistributions of source code must retain the above copyright
7*       notice, this list of conditions and the following disclaimer.
8*     * Redistributions in binary form must reproduce the above copyright
9*       notice, this list of conditions and the following disclaimer in the
10*       documentation and/or other materials provided with the distribution.
11*     * Neither the name of Stanford University nor the
12*       names of its contributors may be used to endorse or promote products
13*       derived from this software without specific prior written permission.
14*
15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25*/ 
26
27#ifndef MAP_REDUCE_H_
28#define MAP_REDUCE_H_
29#include <sys/types.h>
30#include <stdbool.h>
31
32/* Standard data types for the function arguments and results */
33 
34/* Argument to map function. This is specified by the splitter function.
35 * length - number of elements of data. The default splitter function gives
36            length in terms of the # of elements of unit_size bytes.
37 * data - data to process of a user defined type
38 */
39typedef struct
40{
41   intptr_t length;
42   void *data;
43} map_args_t;
44
45/* Single element of result
46 * key - pointer to the key
47 * val - pointer to the value
48 */
49typedef struct
50{
51   void *key;
52   void *val;
53} keyval_t;
54
55/* List of results
56 * length - number of key value pairs
57 * data - array of key value pairs
58 */
59typedef struct
60{
61   int length;
62   keyval_t *data;
63} final_data_t;
64
65/* Scheduler function pointer type definitions */
66
67/* Map function takes in map_args_t, as supplied by the splitter
68 * and emit_intermediate() should be called on any key value pairs
69 * in the intermediate result set.
70 */
71typedef void (*map_t)(map_args_t *);
72
73struct iterator_t;
74typedef struct iterator_t iterator_t;
75int iter_next (iterator_t *itr, void **);
76int iter_size (iterator_t *itr);
77
78/* Reduce function takes in a key pointer, a list of value pointers, and a
79 * length of the list. emit() should be called on any key value pairs
80 * in the result set.
81 */
82typedef void (*reduce_t)(void *, iterator_t *itr);
83
84/* Combiner function takes in an iterator for a particular key,
85 * and returns a reduced value. The operation should be identical to the
86 * reduce function, except that this function returns the reduced value
87 * directly. */
88typedef void *(*combiner_t)(iterator_t *itr);
89
90/* Splitter function takes in a pointer to the input data, an interger of
91 * the number of bytes requested, and an uninitialized pointer to a
92 * map_args_t pointer. The result is stored in map_args_t. The splitter
93 * should return 1 if the result is valid or 0 if there is no more data.
94 */
95typedef int (*splitter_t)(void *, int, map_args_t *);
96
97/* Locator function takes in a pointer to map_args_t, and returns
98 * the memory address where this map task would be heavily accessing.
99 * The runtime would then try to dispatch the task to a thread that
100 * is nearby the physical memory that backs the address. */
101typedef void* (*locator_t)(map_args_t *);
102
103/* Partition function takes in the number of reduce tasks, a pointer to
104 * a key, and the lendth of the key in bytes. It assigns a key to a reduce task.
105 * The value returned is the # of the reduce task where the key will be processed.
106 * This value should be the same for all keys that are equal.
107 */
108typedef int (*partition_t)(int, void *, int);
109
110/* key_cmp(key1, key2) returns:
111 *   0 if key1 == key2
112 *   + if key1 > key2
113 *   - if key1 < key2
114 */
115typedef int (*key_cmp_t)(const void *, const void*);
116
117/* The arguments to operate the runtime. */
118typedef struct
119{
120    void * task_data;           /* The data to run MapReduce on.
121                                 * If splitter is NULL, this should be an array. */
122    off_t data_size;            /* Total # of bytes of data */
123    int unit_size;              /* # of bytes for one element
124                                 * (if necessary, on average) */
125
126    map_t map;                  /* Map function pointer, must be user defined */
127    reduce_t reduce;            /* If NULL, identity reduce function is used,
128                                 * which emits a keyval pair for each val. */
129    combiner_t combiner;        /* If NULL, no combiner would be called. */                             
130    splitter_t splitter;        /* If NULL, the array splitter is used.*/
131    locator_t locator;          /* If NULL, no locality based optimization is
132                                   performed. */
133    key_cmp_t key_cmp;          /* Key comparison function.
134                                   Must be user defined.*/
135
136    final_data_t *result;       /* Pointer to output data.
137                                 * Must be allocated by user */
138
139    /*** Optional arguments must be zero if not used ***/
140    partition_t partition;      /* Default partition function is a
141                                 * hash function */
142
143    /* Creates one emit queue for each reduce task,
144    * instead of per reduce thread. This improves
145    * time to emit if data is emitted in order,
146    * but can increase merge time. */
147    bool use_one_queue_per_task;   
148
149    int L1_cache_size;     /* Size of L1 cache in bytes */
150    int num_map_threads;   /* # of threads to run map tasks on.
151                                 * Default is one per processor */
152    int num_reduce_threads;     /* # of threads to run reduce tasks on.
153    * Default is one per processor */
154    int num_merge_threads;      /* # of threads to run merge tasks on.
155    * Default is one per processor */
156    int num_procs;              /* Maximum number of processors to use. */
157
158    int proc_offset;            /* number of procs to skip for thread binding */
159                                /* (useful if you have multiple MR's running
160                                 *  and you don't want them binding to the same
161                                 *  hardware thread). */
162
163    float key_match_factor;     /* Magic number that describes the ratio of
164    * the input data size to the output data size.
165    * This is used as a hint. */
166} map_reduce_args_t;
167
168/* Runtime defined functions. */
169
170/* MapReduce initialization function. Called once per process. */
171int map_reduce_init ();
172
173/* MapReduce finalization function. Called once per process. */
174int map_reduce_finalize ();
175
176/* The main MapReduce engine. This is the function called by the application.
177 * It is responsible for creating and scheduling all map and reduce tasks, and
178 * also organizes and maintains the data which is passed from application to
179 * map tasks, map tasks to reduce tasks, and reduce tasks back to the
180 * application. Results are stored in args->result. A return value less than zero
181 * represents an error. This function is not thread safe.
182 */   
183int map_reduce (map_reduce_args_t * args);
184
185/* This should be called from the map function. It stores a key with key_size
186 * bytes and a value in the intermediate queues for processing by the reduce
187 * task. The runtime will call partiton function to assign the key to a
188 * reduce task.
189 */
190void emit_intermediate(void *key, void *val, int key_size);
191
192/* This should be called from the reduce function. It stores a key and a value
193 * in the reduce queue. This will be in the final result array.
194 */
195void emit(void *key, void *val);
196
197/* This is the built in partition function which is a hash.  It is global so
198 * the user defined partition function can call it.
199 */
200int default_partition(int reduce_tasks, void* key, int key_size);
201
202#endif // MAP_REDUCE_H_
Note: See TracBrowser for help on using the repository browser.