source: trunk/user/sort/sort.c @ 683

Last change on this file since 683 was 676, checked in by alain, 4 years ago

Introduce chat application to test the named pipes.

  • Property svn:executable set to *
File size: 14.0 KB
Line 
1/*
2 * sort.c - Parallel sort
3 *
4 * Author     Cesar Fuguet Tortolero (2013)
5 *            Alain Greiner (2019)
6 *
7 * Copyright (c) UPMC Sorbonne Universites
8 *
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; version 2.0 of the License.
12 *
13 * It is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 * General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with ALMOS-MKH; if not, write to the Free Software Foundation,
20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23///////////////////////////////////////////////////////////////////////////////
24// This multi-threaded application implement a multi-stage sort.
25// It has been writen by Cesar Fuget Tortolero in 2013.
26// It has been ported on ALMOS-MKH by Alain Greiner in 2019.
27//
28// There is one thread per physical cores.
29// Computation is organised as a binary tree:
30// - All threads execute in parallel a buble sort on a sub-array during the
31//   the first stage of parallel sort,
32// - The number of participating threads is divided by 2 at each next stage,
33//   to make a merge sort, on two subsets of previous stage.
34//
35//       Number_of_stages = number of barriers = log2(Number_of_threads)
36//
37// The various stages are separated by synchronisation barriers, and the
38// main thread uses the join syscall to check that all threads completed
39// before printing the computation time (sequencial & parallel).
40// These results can be - optionnaly - registered in an instrumentation file.
41//
42// Constraints :
43// - It supports up to 1024 cores: x_size, y_size, and ncores must be
44//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
45// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
46//   larger than the number of cores.
47///////////////////////////////////////////////////////////////////////////////
48
49#include <stdio.h>
50#include <stdlib.h>
51#include <unistd.h>
52#include <pthread.h>
53#include <almosmkh.h>
54#include <hal_macros.h>
55
56#define ARRAY_LENGTH        64              // number of items
57#define MAX_THREADS         1024            // 16 * 16 * 4
58
59#define X_MAX               16              // max number of clusters in a row
60#define Y_MAX               16              // max number of clusters in a column
61#define CORES_MAX           4               // max number of cores in a cluster
62#define CLUSTERS_MAX        X_MAX * Y_MAX
63
64#define USE_DQT_BARRIER     1               // use DQT barrier if non zero
65#define DISPLAY_ARRAY       0               // display items values before and after
66#define DEBUG_MAIN          0               // trace main function
67#define DEBUG_SORT          0               // trace sort function
68#define CHECK_RESULT        0               // for debug
69#define INSTRUMENTATION     1               // register computation times on file
70
71////////////////////////////////////////////////////////////////////////////////////
72//            Sort specific global variables
73////////////////////////////////////////////////////////////////////////////////////
74
75int                 array0[ARRAY_LENGTH];    // values to sort
76int                 array1[ARRAY_LENGTH];   
77
78unsigned int        threads;                // total number of working threads
79
80pthread_barrier_t   barrier;                 // synchronisation variables
81
82/////////////////////////////////////////////////////////////////////////////////////
83//             Global variables required by parallel_pthread_create()
84/////////////////////////////////////////////////////////////////////////////////////
85
86
87////////////////////////////////////
88static void bubbleSort( int * array,
89                        unsigned int length,
90                        unsigned int init_pos )
91{
92    unsigned int i;
93    unsigned int j;
94    int          aux;
95
96    for(i = 0; i < length; i++)
97    {
98        for(j = init_pos; j < (init_pos + length - i - 1); j++)
99        {
100            if(array[j] > array[j + 1])
101            {
102                aux          = array[j + 1];
103                array[j + 1] = array[j];
104                array[j]     = aux;
105            }
106        }
107    }
108}  // end bubbleSort()
109
110
111///////////////////////////////////
112static void merge( const int * src,               // source array
113                   int       * dst,               // destination array
114                   int         length,            // number of items in a subset
115                   int         init_pos_src_a,    // index first item in src subset A
116                   int         init_pos_src_b,    // index first item in src subset B
117                   int         init_pos_dst )     // index first item in destination
118{
119    int i;
120    int j;
121    int k;
122
123    i = 0;
124    j = 0;
125    k = init_pos_dst;
126
127    while((i < length) || (j < length))
128    {
129        if((i < length) && (j < length))
130        {
131            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
132            {
133                dst[k++] = src[init_pos_src_a + i];
134                i++;
135            }
136            else
137            {
138                dst[k++] = src[init_pos_src_b + j];
139                j++;
140            }
141        }
142        else if(i < length)
143        {
144            dst[k++] = src[init_pos_src_a + i];
145            i++;
146        }
147        else
148        {
149            dst[k++] = src[init_pos_src_b + j];
150            j++;
151        }
152    }
153}  // end merge()
154
155///////////////////////////////
156void * sort( void * arguments )
157{
158    unsigned int        i;
159    int               * src_array  = NULL;
160    int               * dst_array  = NULL;
161
162    // get arguments
163    pthread_parallel_work_args_t * ptr = (pthread_parallel_work_args_t *)arguments;
164
165    unsigned int        tid            = ptr->tid;
166    pthread_barrier_t * parent_barrier = ptr->barrier;
167
168    unsigned int        items      = ARRAY_LENGTH / threads;
169    unsigned int        stages     = __builtin_ctz( threads ) + 1;
170
171#if DEBUG_SORT
172printf("\n[sort] start : ptr %x / tid %d / threads %d / parent_barrier %x\n",
173ptr, tid, threads, parent_barrier );
174#endif
175
176    bubbleSort( array0, items, items * tid );
177
178#if DEBUG_SORT
179printf("\n[sort] thread[%d] : stage 0 completed\n", tid );
180#endif
181
182    /////////////////////////////////
183    pthread_barrier_wait( &barrier ); 
184
185#if DEBUG_SORT
186printf("\n[sort] thread[%d] exit barrier 0\n", tid );
187#endif
188
189    // the number of threads contributing to sort is divided by 2
190    // and the number of items is multiplied by 2 at each next stage
191    for ( i = 1 ; i < stages ; i++ )
192    {
193        if((i % 2) == 1)               // odd stage
194        {
195            src_array = array0;
196            dst_array = array1;
197        }
198        else                           // even stage
199        {
200            src_array = array1;
201            dst_array = array0;
202        }
203
204        if( (tid & ((1<<i)-1)) == 0 )
205        {
206
207#if DEBUG_SORT
208printf("\n[sort] thread[%d] : stage %d start\n", tid , i );
209#endif
210            merge( src_array, 
211                   dst_array,
212                   items << (i-1),
213                   items * tid,
214                   items * (tid + (1 << (i-1))),
215                   items * tid );
216
217#if DEBUG_SORT
218printf("\n[sort] thread[%d] : stage %d completed\n", tid , i );
219#endif
220        }
221
222        /////////////////////////////////
223        pthread_barrier_wait( &barrier );
224
225#if DEBUG_SORT
226printf("\n[sort] thread[%d] exit barrier %d\n", tid , i );
227#endif
228
229    }  // en for stages
230
231    // sort thread signal completion to pthtread_parallel_create()
232    pthread_barrier_wait( parent_barrier );
233
234#if DEBUG_SORT
235printf("\n[sort] thread[%d] exit\n", tid );
236#endif
237
238    // sort thread exit
239    pthread_exit( NULL );
240
241    return NULL;
242
243} // end sort()
244
245
246////////////////
247int main( void )
248{
249    int                    error;
250    unsigned int           x_size;             // number of rows
251    unsigned int           y_size;             // number of columns
252    unsigned int           ncores;             // number of cores per cluster
253    pthread_barrierattr_t  barrier_attr;       // barrier attributes (used for DQT)
254    unsigned int           n;                  // index in array to sort
255
256    unsigned long long     start_cycle;
257    unsigned long long     seq_end_cycle;
258    unsigned long long     para_end_cycle;
259
260    /////////////////////////
261    get_cycle( &start_cycle );
262 
263    // compute number of working threads (one thread per core)
264    hard_config_t  config;
265    get_config( &config );
266    x_size  = config.x_size;
267    y_size  = config.y_size;
268    ncores  = config.ncores;
269    threads = x_size * y_size * ncores;
270
271    // compute covering DQT size an level
272    unsigned int z = (x_size > y_size) ? x_size : y_size;
273    unsigned int root_level = (z == 1) ? 0 : (z == 2) ? 1 : (z == 4) ? 2 : (z == 8) ? 3 : 4;
274
275    // checks number of threads
276    if ( (threads != 1)   && (threads != 2)   && (threads != 4)   && 
277         (threads != 8)   && (threads != 16 ) && (threads != 32)  && 
278         (threads != 64)  && (threads != 128) && (threads != 256) && 
279         (threads != 512) && (threads != 1024) )
280    {
281        printf("\n[sort] ERROR : number of cores must be power of 2\n");
282        exit( 0 );
283    }
284
285    // check array size
286    if ( ARRAY_LENGTH % threads) 
287    {
288        printf("\n[sort] ERROR : array size must be multiple of number of threads\n");
289        exit( 0 );
290    }
291
292    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
293    threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
294
295    // initialize barrier
296    if( USE_DQT_BARRIER )
297    {
298        barrier_attr.x_size   = x_size; 
299        barrier_attr.y_size   = y_size;
300        barrier_attr.nthreads = ncores;
301        error = pthread_barrier_init( &barrier, &barrier_attr , threads );
302    }
303    else // use SIMPLE_BARRIER
304    {
305        error = pthread_barrier_init( &barrier, NULL , threads );
306    }
307
308    if( error )
309    {
310        printf("\n[sort] ERROR : cannot initialise barrier\n" );
311        exit( 0 );
312    }
313
314#if DEBUG_MAIN
315if( USE_DQT_BARRIER ) printf("\n[sort] main completes DQT barrier init\n");
316else                  printf("\n[sort] main completes simple barrier init\n");
317#endif
318
319    // Array to sort initialization
320    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
321    {
322        array0[n] = ARRAY_LENGTH - n - 1;
323    }
324
325#if DISPLAY_ARRAY
326    printf("\n*** array before sort\n");
327    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
328#endif
329
330#if DEBUG_MAIN
331printf("\n[sort] main completes array init\n");
332#endif
333
334    ///////////////////////////
335    get_cycle( &seq_end_cycle );
336
337#if DEBUG_MAIN
338printf("\n[sort] main completes sequencial init at cycle %d\n",
339(unsigned int)seq_end_cycle );
340#endif
341
342    // create and execute the working threads
343    if( pthread_parallel_create( root_level,
344                                 &sort ) )
345    {
346        printf("\n[sort] ERROR : cannot create threads\n");
347        exit( 0 );
348    }
349
350    ////////////////////////////
351    get_cycle( &para_end_cycle );
352
353#if DEBUG_main
354printf("\n[sort] main completes parallel sort at cycle %d\n", 
355(unsigned int)para_end_cycle );
356#endif
357
358    // destroy barrier
359    pthread_barrier_destroy( &barrier );
360
361#if DISPLAY_ARRAY
362    printf("\n*** array after merge %d\n", i );
363    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
364#endif
365
366#if CHECK_RESULT
367    int    success = 1;
368    int *  res_array = ( (threads ==   2) ||
369                         (threads ==   8) || 
370                         (threads ==  32) || 
371                         (threads == 128) || 
372                         (threads == 512) ) ? array1 : array0;
373
374    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
375    {
376        if ( res_array[n] > res_array[n+1] )
377        {
378            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
379            n , res_array[n] , n+1 , res_array[n+1] );
380            success = 0;
381            break;
382        }
383    }
384
385    if ( success ) printf("\n[sort] success\n");
386    else           printf("\n[sort] failure\n");
387#endif
388
389#if INSTRUMENTATION
390    char               name[64];
391    char               path[128];
392    unsigned long long instru_cycle;
393
394    // build file name
395    if( USE_DQT_BARRIER )
396    snprintf( name , 64 , "p_sort_dqt_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
397    else
398    snprintf( name , 64 , "p_sort_smp_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
399
400    // build file pathname
401    snprintf( path , 128 , "home/%s" , name );
402
403    // compute results
404    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
405    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
406
407    // display results on process terminal
408    printf("\n----- %s -----\n"
409           " - sequencial : %d cycles\n"
410           " - parallel   : %d cycles\n", 
411           name, sequencial, parallel );
412
413    // open file
414    get_cycle( &instru_cycle );
415    FILE * stream = fopen( path , NULL );
416
417    if( stream == NULL )
418    {
419        printf("\n[sort] ERROR : cannot open instrumentation file <%s>\n", path );
420        exit(0);
421    }
422
423    printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle );
424
425#if IDBG
426idbg();
427#endif
428
429    // register results to file
430    get_cycle( &instru_cycle );
431    int ret = fprintf( stream , "\n----- %s -----\n"
432                                " - sequencial : %d cycles\n"
433                                " - parallel   : %d cycles\n", name, sequencial, parallel );
434    if( ret < 0 )
435    {
436        printf("\n[sort] ERROR : cannot write to instrumentation file <%s>\n", path );
437        exit(0);
438    }
439
440    printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle );
441
442#if IDBG
443idbg();
444#endif
445
446    // close instrumentation file
447    get_cycle( &instru_cycle );
448    ret = fclose( stream );
449
450    if( ret )
451    {
452        printf("\n[sort] ERROR : cannot close instrumentation file <%s>\n", path );
453        exit(0);
454    }
455
456    printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle );
457
458#endif
459
460    exit( 0 );
461
462    return 0;
463
464}  // end main()
465
466/*
467vim: tabstop=4 : shiftwidth=4 : expandtab
468*/
Note: See TracBrowser for help on using the repository browser.