///////////////////////////////////////////////////////////////////////////////////////////// // File : router.c // Date : november 2014 // author : Alain Greiner ///////////////////////////////////////////////////////////////////////////////////////////// // This multi-threaded application illustrates "task-farm" parallelism. // It is described as a TCG (Task and Communication Graph). // It contains 2 + N threads : one "producer", one "consumer" and N "router"), // plus the "main" thread, that makes the global initialisation, launches the other // threads, and exit. // It contains 2 MWMR channels per cluster : "fifo_in" and "fifo_out", that are // allocated by the main thread in the user heaps distributed in all clusters. // - The "producer" task writes token in all the "fifo_in". // - The N "router" tasks read token from "fifo_in" and write them into "fifo_out". // - The "consumer" task read token from "fifo_out" and displays instrumentation results. // Token are indexed (by the producer) from 0 to NMAX-1. // The router task contains a random delay emulating a variable processing time. // For instrumentation, the "consumer_tab" array is filled by the "consumer" task. // Each entry contain the arrival order to the consumer task. ///////////////////////////////////////////////////////////////////////////////////////////// #include "stdio.h" #include "mwmr_channel.h" #include "mapping_info.h" #include "hard_config.h" #include "malloc.h" #define VERBOSE 1 #define NMAX 32 // total number of token #define DEPTH 32 // MWMR channels depth // macro to use a shared TTY #define printf(...); { lock_acquire( &tty_lock ); \ giet_tty_printf(__VA_ARGS__); \ lock_release( &tty_lock ); } // lock protecting shared TTY user_lock_t tty_lock; // arguments for the producer, consumer, and router functions typedef struct { mwmr_channel_t* pin; // pointer on the MWMR input fifo mwmr_channel_t* pout; // pointer on the MWMR output fifo unsigned int x_size; // number of clusters in a row unsigned int y_size; // number of clusters in a column } args_t; // arrays of pointers mwmr_channel_t* fifo_in[16][16]; mwmr_channel_t* fifo_out[16][16]; ///////////////////////////////////////////// __attribute__ ((constructor)) void producer() { unsigned int x = 0; // destination cluster coordinate unsigned int y = 0; // destination cluster coordinate unsigned int token = 0; // token value // get plat-form parameters unsigned int x_size; // number of clusters in a row unsigned int y_size; // number of clusters in a column unsigned int nprocs; // unused giet_procs_number( &x_size , &y_size , &nprocs ); // loop on the clusters while ( token < NMAX ) { // try to write a token in fifo_in[x,y] if ( nb_mwmr_write( fifo_in[x][y] , &token , 1 ) == 1 ) { if ( VERBOSE ) printf("[PRODUCER] token %d sent to cluster(%d,%d)" " at cycle %d\n", token , x , y , giet_proctime() ); token++; } // compute next cluster coordinates x++; if ( x == x_size ) { x = 0; y++; if ( y == y_size ) y = 0; } } giet_pthread_exit( "Producer task completed"); } // end producer() ///////////////////////////////////////////// __attribute__ ((constructor)) void consumer() { unsigned int x = 0; // source cluster coordinate unsigned int y = 0; // source cluster coordinate unsigned int n = 0; // index of received token unsigned int token; // token value unsigned int consumer_tab[NMAX]; // received token array // get plat-form parameters unsigned int x_size; // number of clusters in a row unsigned int y_size; // number of clusters in a column unsigned int nprocs; // unused giet_procs_number( &x_size , &y_size , &nprocs ); // loop on the clusters while ( n < NMAX ) { // try to read a token from fifo_out[x,y] if ( nb_mwmr_read( fifo_out[x][y] , &token , 1 ) == 1 ) { consumer_tab[n] = token; n++; if ( VERBOSE ) printf("[CONSUMER] token %d received at cycle %d\n", token , giet_proctime() ); } // compute next cluster coordinates x++; if ( x == x_size ) { x = 0; y++; if ( y == y_size ) y = 0; } } // instrumentation display giet_tty_printf("\n[CONSUMER] displays instrumentation results\n"); for( n = 0 ; n < NMAX ; n++ ) { giet_tty_printf(" - arrival = %d / value = %d\n", n , consumer_tab[n] ); } giet_pthread_exit( "Consumer task completed"); } // end consumer() //////////////////////////////////////////// __attribute__ ((constructor)) void compute() { unsigned int token; // token value unsigned int count; // tempo // get proc coordinates unsigned int x; unsigned int y; unsigned int p; giet_proc_xyp( &x , &y , &p ); // main loop while(1) { mwmr_read( fifo_in[x][y] , &token , 1 ); for ( count = 0 ; count < (giet_rand() << 2) ; count++ ) asm volatile ( "nop" ); mwmr_write( fifo_out[x][y] , &token , 1 ); if ( VERBOSE ) printf("[COMPUTE] token %d handled at cycle %d on P[%d,%d,%d]\n", token , giet_proctime() , x , y , p ); } } // end compute() ///////////////////////////////////////// __attribute__ ((constructor)) void main() { // get plat-form parameters unsigned int x_size; // number of clusters in a row unsigned int y_size; // number of clusters in a column unsigned int nprocs; // number of processors per cluster giet_procs_number( &x_size , &y_size , &nprocs ); // shared TTY allocation giet_tty_alloc( 1 ); lock_init( &tty_lock); // check plat-form parameters giet_pthread_assert( ((nprocs >= 3) && (nprocs <= 8)), "[ROUTER ERROR] nprocs per cluster must be in [3...8]"); giet_pthread_assert( ((x_size >= 1) && (x_size <= 16)), "[ROUTER ERROR] x_size must be in [1...16]"); giet_pthread_assert( ((y_size >= 1) && (y_size <= 16)), "[ROUTER ERROR] y_size must be in [1...16]"); // index for loops unsigned int x; unsigned int y; unsigned int n; // distributed heap initialisation, plus // MWMR channels and associated buffers allocation for ( x = 0 ; x < x_size ; x++ ) { for ( y = 0 ; y < y_size ; y++ ) { heap_init( x , y ); // allocate MWMR channel descriptors in cluster[x][y] fifo_in[x][y] = remote_malloc( sizeof( mwmr_channel_t ) , x , y ); fifo_out[x][y] = remote_malloc( sizeof( mwmr_channel_t ) , x , y ); // allocate data buffers in cluster[x][y] unsigned int* buf_in = remote_malloc( 4*DEPTH , x , y ); unsigned int* buf_out = remote_malloc( 4*DEPTH , x , y ); // initialize MWMR channels mwmr_init( fifo_in[x][y] , buf_in , 1 , DEPTH ); mwmr_init( fifo_out[x][y] , buf_out , 1 , DEPTH ); } } printf("\n[ROUTER] main completes initialisation at cycle %d for %d cores\n", giet_proctime(), (x_size * y_size * nprocs) ); // thread index and function for pthread_create() pthread_t trdid; void* function; // launch producer, consumer and router threads for ( x = 0 ; x < x_size ; x++ ) { for ( y = 0 ; y < y_size ; y++ ) { for ( n = 0 ; n < nprocs ; n++ ) { if ( (x==0) && (y==0) && (n==0) ) function = &producer; else if ( (x==0) && (y==0) && (n==1) ) function = &consumer; else function = &compute; if ( giet_pthread_create( &trdid, NULL, // no attribute function, NULL ) ) // no argument { printf("\n[ROUTER ERROR] launching thread on P[%d,%d,%d]\n", x, y, n ); giet_pthread_exit( NULL ); } } } } giet_pthread_exit( "main completed" ); } // end main()