1 | /* Copyright (c) 2007-2009, Stanford University |
---|
2 | * All rights reserved. |
---|
3 | * |
---|
4 | * Redistribution and use in source and binary forms, with or without |
---|
5 | * modification, are permitted provided that the following conditions are met: |
---|
6 | * * Redistributions of source code must retain the above copyright |
---|
7 | * notice, this list of conditions and the following disclaimer. |
---|
8 | * * Redistributions in binary form must reproduce the above copyright |
---|
9 | * notice, this list of conditions and the following disclaimer in the |
---|
10 | * documentation and/or other materials provided with the distribution. |
---|
11 | * * Neither the name of Stanford University nor the |
---|
12 | * names of its contributors may be used to endorse or promote products |
---|
13 | * derived from this software without specific prior written permission. |
---|
14 | * |
---|
15 | * THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY |
---|
16 | * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
---|
17 | * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
---|
18 | * DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY |
---|
19 | * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
---|
20 | * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
---|
21 | * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND |
---|
22 | * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
---|
23 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
---|
24 | * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
25 | */ |
---|
26 | |
---|
27 | #ifndef MAP_REDUCE_H_ |
---|
28 | #define MAP_REDUCE_H_ |
---|
29 | #include <sys/types.h> |
---|
30 | #include <stdbool.h> |
---|
31 | |
---|
32 | /* Standard data types for the function arguments and results */ |
---|
33 | |
---|
34 | /* Argument to map function. This is specified by the splitter function. |
---|
35 | * length - number of elements of data. The default splitter function gives |
---|
36 | length in terms of the # of elements of unit_size bytes. |
---|
37 | * data - data to process of a user defined type |
---|
38 | */ |
---|
39 | typedef struct |
---|
40 | { |
---|
41 | intptr_t length; |
---|
42 | void *data; |
---|
43 | } map_args_t; |
---|
44 | |
---|
45 | /* Single element of result |
---|
46 | * key - pointer to the key |
---|
47 | * val - pointer to the value |
---|
48 | */ |
---|
49 | typedef struct |
---|
50 | { |
---|
51 | void *key; |
---|
52 | void *val; |
---|
53 | } keyval_t; |
---|
54 | |
---|
55 | /* List of results |
---|
56 | * length - number of key value pairs |
---|
57 | * data - array of key value pairs |
---|
58 | */ |
---|
59 | typedef struct |
---|
60 | { |
---|
61 | int length; |
---|
62 | keyval_t *data; |
---|
63 | } final_data_t; |
---|
64 | |
---|
65 | /* Scheduler function pointer type definitions */ |
---|
66 | |
---|
67 | /* Map function takes in map_args_t, as supplied by the splitter |
---|
68 | * and emit_intermediate() should be called on any key value pairs |
---|
69 | * in the intermediate result set. |
---|
70 | */ |
---|
71 | typedef void (*map_t)(map_args_t *); |
---|
72 | |
---|
73 | struct iterator_t; |
---|
74 | typedef struct iterator_t iterator_t; |
---|
75 | int iter_next (iterator_t *itr, void **); |
---|
76 | int iter_size (iterator_t *itr); |
---|
77 | |
---|
78 | /* Reduce function takes in a key pointer, a list of value pointers, and a |
---|
79 | * length of the list. emit() should be called on any key value pairs |
---|
80 | * in the result set. |
---|
81 | */ |
---|
82 | typedef void (*reduce_t)(void *, iterator_t *itr); |
---|
83 | |
---|
84 | /* Combiner function takes in an iterator for a particular key, |
---|
85 | * and returns a reduced value. The operation should be identical to the |
---|
86 | * reduce function, except that this function returns the reduced value |
---|
87 | * directly. */ |
---|
88 | typedef void *(*combiner_t)(iterator_t *itr); |
---|
89 | |
---|
90 | /* Splitter function takes in a pointer to the input data, an interger of |
---|
91 | * the number of bytes requested, and an uninitialized pointer to a |
---|
92 | * map_args_t pointer. The result is stored in map_args_t. The splitter |
---|
93 | * should return 1 if the result is valid or 0 if there is no more data. |
---|
94 | */ |
---|
95 | typedef int (*splitter_t)(void *, int, map_args_t *); |
---|
96 | |
---|
97 | /* Locator function takes in a pointer to map_args_t, and returns |
---|
98 | * the memory address where this map task would be heavily accessing. |
---|
99 | * The runtime would then try to dispatch the task to a thread that |
---|
100 | * is nearby the physical memory that backs the address. */ |
---|
101 | typedef void* (*locator_t)(map_args_t *); |
---|
102 | |
---|
103 | /* Partition function takes in the number of reduce tasks, a pointer to |
---|
104 | * a key, and the lendth of the key in bytes. It assigns a key to a reduce task. |
---|
105 | * The value returned is the # of the reduce task where the key will be processed. |
---|
106 | * This value should be the same for all keys that are equal. |
---|
107 | */ |
---|
108 | typedef int (*partition_t)(int, void *, int); |
---|
109 | |
---|
110 | /* key_cmp(key1, key2) returns: |
---|
111 | * 0 if key1 == key2 |
---|
112 | * + if key1 > key2 |
---|
113 | * - if key1 < key2 |
---|
114 | */ |
---|
115 | typedef int (*key_cmp_t)(const void *, const void*); |
---|
116 | |
---|
117 | /* The arguments to operate the runtime. */ |
---|
118 | typedef struct |
---|
119 | { |
---|
120 | void * task_data; /* The data to run MapReduce on. |
---|
121 | * If splitter is NULL, this should be an array. */ |
---|
122 | off_t data_size; /* Total # of bytes of data */ |
---|
123 | int unit_size; /* # of bytes for one element |
---|
124 | * (if necessary, on average) */ |
---|
125 | |
---|
126 | map_t map; /* Map function pointer, must be user defined */ |
---|
127 | reduce_t reduce; /* If NULL, identity reduce function is used, |
---|
128 | * which emits a keyval pair for each val. */ |
---|
129 | combiner_t combiner; /* If NULL, no combiner would be called. */ |
---|
130 | splitter_t splitter; /* If NULL, the array splitter is used.*/ |
---|
131 | locator_t locator; /* If NULL, no locality based optimization is |
---|
132 | performed. */ |
---|
133 | key_cmp_t key_cmp; /* Key comparison function. |
---|
134 | Must be user defined.*/ |
---|
135 | |
---|
136 | final_data_t *result; /* Pointer to output data. |
---|
137 | * Must be allocated by user */ |
---|
138 | |
---|
139 | /*** Optional arguments must be zero if not used ***/ |
---|
140 | partition_t partition; /* Default partition function is a |
---|
141 | * hash function */ |
---|
142 | |
---|
143 | /* Creates one emit queue for each reduce task, |
---|
144 | * instead of per reduce thread. This improves |
---|
145 | * time to emit if data is emitted in order, |
---|
146 | * but can increase merge time. */ |
---|
147 | bool use_one_queue_per_task; |
---|
148 | |
---|
149 | int L1_cache_size; /* Size of L1 cache in bytes */ |
---|
150 | int num_map_threads; /* # of threads to run map tasks on. |
---|
151 | * Default is one per processor */ |
---|
152 | int num_reduce_threads; /* # of threads to run reduce tasks on. |
---|
153 | * Default is one per processor */ |
---|
154 | int num_merge_threads; /* # of threads to run merge tasks on. |
---|
155 | * Default is one per processor */ |
---|
156 | int num_procs; /* Maximum number of processors to use. */ |
---|
157 | |
---|
158 | int proc_offset; /* number of procs to skip for thread binding */ |
---|
159 | /* (useful if you have multiple MR's running |
---|
160 | * and you don't want them binding to the same |
---|
161 | * hardware thread). */ |
---|
162 | |
---|
163 | float key_match_factor; /* Magic number that describes the ratio of |
---|
164 | * the input data size to the output data size. |
---|
165 | * This is used as a hint. */ |
---|
166 | } map_reduce_args_t; |
---|
167 | |
---|
168 | /* Runtime defined functions. */ |
---|
169 | |
---|
170 | /* MapReduce initialization function. Called once per process. */ |
---|
171 | int map_reduce_init (); |
---|
172 | |
---|
173 | /* MapReduce finalization function. Called once per process. */ |
---|
174 | int map_reduce_finalize (); |
---|
175 | |
---|
176 | /* The main MapReduce engine. This is the function called by the application. |
---|
177 | * It is responsible for creating and scheduling all map and reduce tasks, and |
---|
178 | * also organizes and maintains the data which is passed from application to |
---|
179 | * map tasks, map tasks to reduce tasks, and reduce tasks back to the |
---|
180 | * application. Results are stored in args->result. A return value less than zero |
---|
181 | * represents an error. This function is not thread safe. |
---|
182 | */ |
---|
183 | int map_reduce (map_reduce_args_t * args); |
---|
184 | |
---|
185 | /* This should be called from the map function. It stores a key with key_size |
---|
186 | * bytes and a value in the intermediate queues for processing by the reduce |
---|
187 | * task. The runtime will call partiton function to assign the key to a |
---|
188 | * reduce task. |
---|
189 | */ |
---|
190 | void emit_intermediate(void *key, void *val, int key_size); |
---|
191 | |
---|
192 | /* This should be called from the reduce function. It stores a key and a value |
---|
193 | * in the reduce queue. This will be in the final result array. |
---|
194 | */ |
---|
195 | void emit(void *key, void *val); |
---|
196 | |
---|
197 | /* This is the built in partition function which is a hash. It is global so |
---|
198 | * the user defined partition function can call it. |
---|
199 | */ |
---|
200 | int default_partition(int reduce_tasks, void* key, int key_size); |
---|
201 | |
---|
202 | #endif // MAP_REDUCE_H_ |
---|