[1] | 1 | /* Copyright (c) 2007-2009, Stanford University |
---|
| 2 | * All rights reserved. |
---|
| 3 | * |
---|
| 4 | * Redistribution and use in source and binary forms, with or without |
---|
| 5 | * modification, are permitted provided that the following conditions are met: |
---|
| 6 | * * Redistributions of source code must retain the above copyright |
---|
| 7 | * notice, this list of conditions and the following disclaimer. |
---|
| 8 | * * Redistributions in binary form must reproduce the above copyright |
---|
| 9 | * notice, this list of conditions and the following disclaimer in the |
---|
| 10 | * documentation and/or other materials provided with the distribution. |
---|
| 11 | * * Neither the name of Stanford University nor the |
---|
| 12 | * names of its contributors may be used to endorse or promote products |
---|
| 13 | * derived from this software without specific prior written permission. |
---|
| 14 | * |
---|
| 15 | * THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY |
---|
| 16 | * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
---|
| 17 | * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
---|
| 18 | * DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY |
---|
| 19 | * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
---|
| 20 | * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
---|
| 21 | * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND |
---|
| 22 | * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
---|
| 23 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
---|
| 24 | * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
| 25 | */ |
---|
| 26 | |
---|
| 27 | #ifndef MAP_REDUCE_H_ |
---|
| 28 | #define MAP_REDUCE_H_ |
---|
| 29 | #include <sys/types.h> |
---|
| 30 | #include <stdbool.h> |
---|
| 31 | |
---|
| 32 | /* Standard data types for the function arguments and results */ |
---|
| 33 | |
---|
| 34 | /* Argument to map function. This is specified by the splitter function. |
---|
| 35 | * length - number of elements of data. The default splitter function gives |
---|
| 36 | length in terms of the # of elements of unit_size bytes. |
---|
| 37 | * data - data to process of a user defined type |
---|
| 38 | */ |
---|
| 39 | typedef struct |
---|
| 40 | { |
---|
| 41 | intptr_t length; |
---|
| 42 | void *data; |
---|
| 43 | } map_args_t; |
---|
| 44 | |
---|
| 45 | /* Single element of result |
---|
| 46 | * key - pointer to the key |
---|
| 47 | * val - pointer to the value |
---|
| 48 | */ |
---|
| 49 | typedef struct |
---|
| 50 | { |
---|
| 51 | void *key; |
---|
| 52 | void *val; |
---|
| 53 | } keyval_t; |
---|
| 54 | |
---|
| 55 | /* List of results |
---|
| 56 | * length - number of key value pairs |
---|
| 57 | * data - array of key value pairs |
---|
| 58 | */ |
---|
| 59 | typedef struct |
---|
| 60 | { |
---|
| 61 | int length; |
---|
| 62 | keyval_t *data; |
---|
| 63 | } final_data_t; |
---|
| 64 | |
---|
| 65 | /* Scheduler function pointer type definitions */ |
---|
| 66 | |
---|
| 67 | /* Map function takes in map_args_t, as supplied by the splitter |
---|
| 68 | * and emit_intermediate() should be called on any key value pairs |
---|
| 69 | * in the intermediate result set. |
---|
| 70 | */ |
---|
| 71 | typedef void (*map_t)(map_args_t *); |
---|
| 72 | |
---|
| 73 | struct iterator_t; |
---|
| 74 | typedef struct iterator_t iterator_t; |
---|
| 75 | int iter_next (iterator_t *itr, void **); |
---|
| 76 | int iter_size (iterator_t *itr); |
---|
| 77 | |
---|
| 78 | /* Reduce function takes in a key pointer, a list of value pointers, and a |
---|
| 79 | * length of the list. emit() should be called on any key value pairs |
---|
| 80 | * in the result set. |
---|
| 81 | */ |
---|
| 82 | typedef void (*reduce_t)(void *, iterator_t *itr); |
---|
| 83 | |
---|
| 84 | /* Combiner function takes in an iterator for a particular key, |
---|
| 85 | * and returns a reduced value. The operation should be identical to the |
---|
| 86 | * reduce function, except that this function returns the reduced value |
---|
| 87 | * directly. */ |
---|
| 88 | typedef void *(*combiner_t)(iterator_t *itr); |
---|
| 89 | |
---|
| 90 | /* Splitter function takes in a pointer to the input data, an interger of |
---|
| 91 | * the number of bytes requested, and an uninitialized pointer to a |
---|
| 92 | * map_args_t pointer. The result is stored in map_args_t. The splitter |
---|
| 93 | * should return 1 if the result is valid or 0 if there is no more data. |
---|
| 94 | */ |
---|
| 95 | typedef int (*splitter_t)(void *, int, map_args_t *); |
---|
| 96 | |
---|
| 97 | /* Locator function takes in a pointer to map_args_t, and returns |
---|
| 98 | * the memory address where this map task would be heavily accessing. |
---|
| 99 | * The runtime would then try to dispatch the task to a thread that |
---|
| 100 | * is nearby the physical memory that backs the address. */ |
---|
| 101 | typedef void* (*locator_t)(map_args_t *); |
---|
| 102 | |
---|
| 103 | /* Partition function takes in the number of reduce tasks, a pointer to |
---|
| 104 | * a key, and the lendth of the key in bytes. It assigns a key to a reduce task. |
---|
| 105 | * The value returned is the # of the reduce task where the key will be processed. |
---|
| 106 | * This value should be the same for all keys that are equal. |
---|
| 107 | */ |
---|
| 108 | typedef int (*partition_t)(int, void *, int); |
---|
| 109 | |
---|
| 110 | /* key_cmp(key1, key2) returns: |
---|
| 111 | * 0 if key1 == key2 |
---|
| 112 | * + if key1 > key2 |
---|
| 113 | * - if key1 < key2 |
---|
| 114 | */ |
---|
| 115 | typedef int (*key_cmp_t)(const void *, const void*); |
---|
| 116 | |
---|
| 117 | /* The arguments to operate the runtime. */ |
---|
| 118 | typedef struct |
---|
| 119 | { |
---|
| 120 | void * task_data; /* The data to run MapReduce on. |
---|
| 121 | * If splitter is NULL, this should be an array. */ |
---|
| 122 | off_t data_size; /* Total # of bytes of data */ |
---|
| 123 | int unit_size; /* # of bytes for one element |
---|
| 124 | * (if necessary, on average) */ |
---|
| 125 | |
---|
| 126 | map_t map; /* Map function pointer, must be user defined */ |
---|
| 127 | reduce_t reduce; /* If NULL, identity reduce function is used, |
---|
| 128 | * which emits a keyval pair for each val. */ |
---|
| 129 | combiner_t combiner; /* If NULL, no combiner would be called. */ |
---|
| 130 | splitter_t splitter; /* If NULL, the array splitter is used.*/ |
---|
| 131 | locator_t locator; /* If NULL, no locality based optimization is |
---|
| 132 | performed. */ |
---|
| 133 | key_cmp_t key_cmp; /* Key comparison function. |
---|
| 134 | Must be user defined.*/ |
---|
| 135 | |
---|
| 136 | final_data_t *result; /* Pointer to output data. |
---|
| 137 | * Must be allocated by user */ |
---|
| 138 | |
---|
| 139 | /*** Optional arguments must be zero if not used ***/ |
---|
| 140 | partition_t partition; /* Default partition function is a |
---|
| 141 | * hash function */ |
---|
| 142 | |
---|
| 143 | /* Creates one emit queue for each reduce task, |
---|
| 144 | * instead of per reduce thread. This improves |
---|
| 145 | * time to emit if data is emitted in order, |
---|
| 146 | * but can increase merge time. */ |
---|
| 147 | bool use_one_queue_per_task; |
---|
| 148 | |
---|
| 149 | int L1_cache_size; /* Size of L1 cache in bytes */ |
---|
| 150 | int num_map_threads; /* # of threads to run map tasks on. |
---|
| 151 | * Default is one per processor */ |
---|
| 152 | int num_reduce_threads; /* # of threads to run reduce tasks on. |
---|
| 153 | * Default is one per processor */ |
---|
| 154 | int num_merge_threads; /* # of threads to run merge tasks on. |
---|
| 155 | * Default is one per processor */ |
---|
| 156 | int num_procs; /* Maximum number of processors to use. */ |
---|
| 157 | |
---|
| 158 | int proc_offset; /* number of procs to skip for thread binding */ |
---|
| 159 | /* (useful if you have multiple MR's running |
---|
| 160 | * and you don't want them binding to the same |
---|
| 161 | * hardware thread). */ |
---|
| 162 | |
---|
| 163 | float key_match_factor; /* Magic number that describes the ratio of |
---|
| 164 | * the input data size to the output data size. |
---|
| 165 | * This is used as a hint. */ |
---|
| 166 | } map_reduce_args_t; |
---|
| 167 | |
---|
| 168 | /* Runtime defined functions. */ |
---|
| 169 | |
---|
| 170 | /* MapReduce initialization function. Called once per process. */ |
---|
| 171 | int map_reduce_init (); |
---|
| 172 | |
---|
| 173 | /* MapReduce finalization function. Called once per process. */ |
---|
| 174 | int map_reduce_finalize (); |
---|
| 175 | |
---|
| 176 | /* The main MapReduce engine. This is the function called by the application. |
---|
| 177 | * It is responsible for creating and scheduling all map and reduce tasks, and |
---|
| 178 | * also organizes and maintains the data which is passed from application to |
---|
| 179 | * map tasks, map tasks to reduce tasks, and reduce tasks back to the |
---|
| 180 | * application. Results are stored in args->result. A return value less than zero |
---|
| 181 | * represents an error. This function is not thread safe. |
---|
| 182 | */ |
---|
| 183 | int map_reduce (map_reduce_args_t * args); |
---|
| 184 | |
---|
| 185 | /* This should be called from the map function. It stores a key with key_size |
---|
| 186 | * bytes and a value in the intermediate queues for processing by the reduce |
---|
| 187 | * task. The runtime will call partiton function to assign the key to a |
---|
| 188 | * reduce task. |
---|
| 189 | */ |
---|
| 190 | void emit_intermediate(void *key, void *val, int key_size); |
---|
| 191 | |
---|
| 192 | /* This should be called from the reduce function. It stores a key and a value |
---|
| 193 | * in the reduce queue. This will be in the final result array. |
---|
| 194 | */ |
---|
| 195 | void emit(void *key, void *val); |
---|
| 196 | |
---|
| 197 | /* This is the built in partition function which is a hash. It is global so |
---|
| 198 | * the user defined partition function can call it. |
---|
| 199 | */ |
---|
| 200 | int default_partition(int reduce_tasks, void* key, int key_size); |
---|
| 201 | |
---|
| 202 | #endif // MAP_REDUCE_H_ |
---|