///////////////////////////////////////////////////////////////////////////////////////// // File : mjpeg.c // Date : octobre 2015 // author : Alain Greiner ///////////////////////////////////////////////////////////////////////////////////////// // This multi-threaded application illustrates "pipe-line" parallelism, and message // passing programming model, on top of the POSIX threads API. // It makes the parallel decompression of a MJPEG bitstream contained in a file. // The application is described as a TCG (Task and Communication Graph), and all // communications between threads uses MWMR channels,. // It uses the chained buffer DMA component to display the images on the graphic display. // It contains 5 types of threads, plus the "main" thread, that makes initialisation, // dispatch the byte stream to the various pipelines, and makes instrumentation. // and 7 types of MWMR communication channels: // - the main thread is only mapped in cluster[0,0], but all other threads // (DEMUX, VLD, IQZZ, IDCT, LIBU) are replicated in all clusters. // - all MWMR channels are replicated in all clusters. // The number of cluster cannot be larger than 16*16. // The number of processors per cluster is not constrained. // The frame buffer size must fit the decompressed images size. // It uses one TTY terminal shared by all tasks. ///////////////////////////////////////////////////////////////////////////////////////// #include #include #include #include #include "mjpeg.h" #include // for coprocessor types and modes // macro to use a shared TTY #define PRINTF(...) do { lock_acquire( &tty_lock ); \ giet_tty_printf(__VA_ARGS__); \ lock_release( &tty_lock ); } while(0); /////////////////////////////////////////////// // Global variables /////////////////////////////////////////////// uint32_t fd; // file descriptor for the file containing the MJPEG stream // arrays of pointers on MWMR channels mwmr_channel_t* main_2_demux[256]; // one per cluster mwmr_channel_t* demux_2_vld_data[256]; // one per cluster mwmr_channel_t* demux_2_vld_huff[256]; // one per cluster mwmr_channel_t* demux_2_iqzz[256]; // one per cluster mwmr_channel_t* vld_2_iqzz[256]; // one per cluster mwmr_channel_t* iqzz_2_idct[256]; // one per cluster mwmr_channel_t* idct_2_libu[256]; // one per cluster // thread trdid ( for pthread_create() and pthread_join() ) pthread_t trdid_demux[256]; // one per cluster pthread_t trdid_vld[256]; // one per cluster pthread_t trdid_iqzz[256]; // one per cluster pthread_t trdid_idct[256]; // one per cluster pthread_t trdid_libu[256]; // one per cluster user_lock_t tty_lock; // lock protecting shared TTY uint8_t* cma_buf[256]; // CMA buffers (one per cluster) void* cma_sts[256]; // CMA buffers status uint32_t fbf_width; // Frame Buffer width uint32_t fbf_height; // Frame Buffer height uint32_t nblocks_h; // number of blocks in a column uint32_t nblocks_w; // number of blocks in a row uint32_t date[MAX_IMAGES]; // date of libu completion //////////////////////////////////////////////// // declare thread functions //////////////////////////////////////////////// extern void demux( uint32_t index ); extern void vld( uint32_t index ); extern void iqzz( uint32_t index ); extern void idct( uint32_t index ); extern void libu( uint32_t index ); ///////////////////////////////////////// __attribute__ ((constructor)) void main() ///////////////////////////////////////// { // get platform parameters uint32_t x_size; uint32_t y_size; uint32_t nprocs; giet_procs_number( &x_size , &y_size , &nprocs ); // shared TTY allocation giet_tty_alloc( 1 ); lock_init( &tty_lock ); // check platform parameters giet_pthread_assert( (nprocs <= 6), "[MJPEG ERROR] nprocs cannot be larger than 4"); giet_pthread_assert( (x_size <= 16), "[MJPEG ERROR] x_size cannot be larger than 16"); giet_pthread_assert( (y_size <= 16), "[MJPEG ERROR] y_size cannot be larger than 16"); giet_pthread_assert( (MAX_IMAGES >= (x_size*y_size)), "MJPEG ERROR] number of images smaller than x_size * y_size"); // check frame buffer size giet_fbf_size( &fbf_width , &fbf_height ); giet_pthread_assert( ((fbf_width & 0x7) == 0) && ((fbf_height & 0x7) == 0) , "[MJPEG ERROR] image width and height must be multiple of 8"); // request frame buffer and CMA channel allocation giet_fbf_alloc(); giet_fbf_cma_alloc( x_size * y_size ); // file name and image size acquisition char file_pathname[256]; uint32_t image_width; uint32_t image_height; uint32_t fd; // file descriptor if ( INTERACTIVE_MODE ) { PRINTF("\n[MJPEG] enter path for JPEG stream file (default is plan_48.mjpg)\n> "); giet_tty_gets( file_pathname , 256 ); if ( file_pathname[0] == 0 ) { strcpy( file_pathname , "/misc/plan_48.mjpg" ); image_width = 48; image_height = 48; } else { PRINTF("\n[MJPEG] enter image width\n> "); giet_tty_getw( &image_width ); PRINTF("\n[MJPEG] enter image height\n> "); giet_tty_getw( &image_height ); PRINTF("\n"); } } else { strcpy( file_pathname , "/misc/plan_48.mjpg" ); image_width = 48; image_height = 48; } giet_pthread_assert( (image_width == fbf_width) && (image_height == fbf_height) , "[MJPEG ERROR] image size doesn't fit frame buffer size"); if ( USE_DCT_COPROC ) { PRINTF("\n\n[MJPEG] stream %s / %d clusters / %d cores / DCT COPROC\n\n", file_pathname , x_size*y_size , nprocs ); } else { PRINTF("\n\n[MJPEG] stream %s / %d clusters / %d cores / NO DCT COPROC\n\n", file_pathname , x_size*y_size , nprocs ); } // compute nblocks_h & nblocks_w nblocks_w = fbf_width / 8; nblocks_h = fbf_height / 8; // open file containing the MJPEG bit stream fd = giet_fat_open( file_pathname , 0 ); giet_pthread_assert( (fd >= 0), "[MJPEG ERROR] cannot open MJPEG stream file"); // index for loops uint32_t x; uint32_t y; uint32_t n; uint32_t* buffer; // initialise distributed heap, // allocate MWMR channels // allocate buffers for CMA for ( x = 0 ; x < x_size ; x++ ) { for ( y = 0 ; y < y_size ; y++ ) { uint32_t index = x*y_size + y; // initialise heap[x][y] heap_init( x , y ); // allocate MWMR channels in cluster[x][y] main_2_demux[index] = remote_malloc( sizeof( mwmr_channel_t ) , x , y ); buffer = remote_malloc( 4 * MAIN_2_DEMUX_DEPTH , x , y ); mwmr_init( main_2_demux[index] , buffer , 1 , MAIN_2_DEMUX_DEPTH ); demux_2_vld_data[index] = remote_malloc( sizeof( mwmr_channel_t ) , x , y ); buffer = remote_malloc( 4 * DEMUX_2_VLD_DATA_DEPTH , x , y ); mwmr_init( demux_2_vld_data[index] , buffer , 1 , DEMUX_2_VLD_DATA_DEPTH ); demux_2_vld_huff[index] = remote_malloc( sizeof( mwmr_channel_t ) , x , y ); buffer = remote_malloc( 4 * DEMUX_2_VLD_HUFF_DEPTH , x , y ); mwmr_init( demux_2_vld_huff[index] , buffer , 1 , DEMUX_2_VLD_HUFF_DEPTH ); demux_2_iqzz[index] = remote_malloc( sizeof( mwmr_channel_t ) , x , y ); buffer = remote_malloc( 4 * DEMUX_2_IQZZ_DEPTH , x , y ); mwmr_init( demux_2_iqzz[index] , buffer , 1 , DEMUX_2_IQZZ_DEPTH ); vld_2_iqzz[index] = remote_malloc( sizeof( mwmr_channel_t ) , x , y ); buffer = remote_malloc( 4 * VLD_2_IQZZ_DEPTH , x , y ); mwmr_init( vld_2_iqzz[index] , buffer , 1 , VLD_2_IQZZ_DEPTH ); iqzz_2_idct[index] = remote_malloc( sizeof( mwmr_channel_t ) , x , y ); buffer = remote_malloc( 4 * IQZZ_2_IDCT_DEPTH , x , y ); mwmr_init( iqzz_2_idct[index] , buffer , 1 , IQZZ_2_IDCT_DEPTH ); idct_2_libu[index] = remote_malloc( sizeof( mwmr_channel_t ) , x , y ); buffer = remote_malloc( 4 * IDCT_2_LIBU_DEPTH , x , y ); mwmr_init( idct_2_libu[index] , buffer , 1 , IDCT_2_LIBU_DEPTH ); // allocate and register CMA buffers in cluster[x][y] cma_buf[index] = remote_malloc( fbf_width * fbf_height , x , y ); cma_sts[index] = remote_malloc( 64 , x , y ); giet_fbf_cma_init_buf( index , cma_buf[index] , cma_sts[index] ); } } // start CMA channel giet_fbf_cma_start(); mwmr_channel_t* pc; for ( n = 0 ; n < x_size*y_size ; n++ ) { pc = main_2_demux[n]; PRINTF(" - main_2_demux[%d] = %x / &lock = %x / &buf = %x / size = %d\n", n, pc, (uint32_t)&pc->lock, (uint32_t)pc->data, pc->depth<<2 ); pc = demux_2_vld_data[n]; PRINTF(" - demux_2_vld[%d] = %x / &lock = %x / &buf = %x / size = %d\n", n, pc, (uint32_t)&pc->lock, (uint32_t)pc->data, pc->depth<<2 ); pc = vld_2_iqzz[n]; PRINTF(" - vld_2_iqzz[%d] = %x / &lock = %x / &buf = %x / size = %d\n", n, pc, (uint32_t)&pc->lock, (uint32_t)pc->data, pc->depth<<2 ); pc = iqzz_2_idct[n]; PRINTF(" - iqzz_2_idct[%d] = %x / &lock = %x / &buf = %x / size = %d\n", n, pc, (uint32_t)&pc->lock, (uint32_t)pc->data, pc->depth<<2 ); pc = idct_2_libu[n]; PRINTF(" - idct_2_libu[%d] = %x / &lock = %x / &buf = %x / size = %d\n", n, pc, (uint32_t)&pc->lock, (uint32_t)pc->data, pc->depth<<2 ); } // launch all threads : precise mapping is defined in the mjpeg.py file uint32_t index; for ( x = 0 ; x < x_size ; x++ ) { for ( y = 0 ; y < y_size ; y++ ) { index = x * y_size + y; // DEMUX if ( giet_pthread_create( &trdid_demux[index], NULL, &demux , (void*)index ) ) giet_pthread_exit( "error launching thread demux\n"); // VLD if ( giet_pthread_create( &trdid_vld[index], NULL, &vld , (void*)index ) ) giet_pthread_exit( "error launching thread vld\n"); // IQZZ if ( giet_pthread_create( &trdid_iqzz[index], NULL, &iqzz , (void*)index ) ) giet_pthread_exit( "error launching thread iqzz"); // IDCT if ( USE_DCT_COPROC ) // allocate, initialise, and start hardware coprocessor { giet_coproc_channel_t in_channel; giet_coproc_channel_t out_channel; uint32_t cluster_xy = (x<<4) + y; uint32_t coproc_type = 2; uint32_t info; // allocate DCT coprocessor giet_coproc_alloc( cluster_xy , coproc_type , &info ); // initialize channels in_channel.channel_mode = MODE_MWMR; in_channel.buffer_size = (iqzz_2_idct[index]->depth)<<2; in_channel.buffer_vaddr = (uint32_t)(iqzz_2_idct[index]->data); in_channel.status_vaddr = (uint32_t)(&iqzz_2_idct[index]->sts); in_channel.lock_vaddr = (uint32_t)(&iqzz_2_idct[index]->lock); giet_coproc_channel_init( cluster_xy , coproc_type , 0 , &in_channel ); out_channel.channel_mode = MODE_MWMR; out_channel.buffer_size = (idct_2_libu[index]->depth)<<2; out_channel.buffer_vaddr = (uint32_t)(idct_2_libu[index]->data); out_channel.status_vaddr = (uint32_t)(&idct_2_libu[index]->sts); out_channel.lock_vaddr = (uint32_t)(&idct_2_libu[index]->lock); giet_coproc_channel_init( cluster_xy , coproc_type , 1 , &out_channel ); // start coprocessor giet_coproc_run( cluster_xy , coproc_type ); } else // launches a software thread { if ( giet_pthread_create( &trdid_idct[index], NULL, &idct , (void*)index ) ) giet_pthread_exit( "error launching thread idct\n"); } // LIBU if ( giet_pthread_create( &trdid_libu[index], NULL, &libu , (void*)index ) ) giet_pthread_exit( "error launching thread libu\n"); } } ///////////////////////////////////////////////////////////////////////////////////// // dispatch the byte stream to the demux threads, one compressed image per cluster. // It transfer the stream from the file identified by the fd argument to a 1024 // bytes local buffer. It analyses the stream to detect the End_of_Image markers. // All the bytes corresponding to a single image from the first byte, to the EOI // marker included, are written in the main_2_demux[index] channel, in increasing // order of the cluster index. ///////////////////////////////////////////////////////////////////////////////////// // allocate input buffer : 1024 bytes uint8_t bufin[1024]; // allocate output bufio to access output MWMR channels : 64 bytes == 16 words mwmr_bufio_t bufio; uint8_t bufout[64]; mwmr_bufio_init( &bufio , bufout , 64 , 0 , main_2_demux[0] ); uint32_t image; // image index uint32_t cluster; // cluster index / modulo x_size*y_size uint32_t ptr; // byte pointer in input buffer uint32_t eoi_found; // boolean : End-of-Image found uint32_t ff_found; // boolean : 0xFF value found uint32_t bytes_count; // mumber of bytes in compressed image // initialise image and cluster index, and bufin pointer image = 0; cluster = 0; ptr = 0; while( image < MAX_IMAGES ) // one compressed image per iteration { // initialise image specific variables eoi_found = 0; ff_found = 0; bytes_count = 0; // re-initialise the destination buffer for each image bufio.mwmr = main_2_demux[cluster]; // scan bit stream until EOI found // transfer one byte per iteration from input buffer to output bufio while ( eoi_found == 0 ) { // - tranfer 1024 bytes from file to input buffer when input buffer empty. // - return to first byte in input file when EOF found, // to emulate an infinite stream of images. if ( ptr == 0 ) { uint32_t r = giet_fat_read( fd , bufin , 1024 ); if ( r < 1024 ) { giet_fat_lseek( fd , 0 , SEEK_SET ); giet_fat_read( fd , bufin + r , 1024 - r ); } } // transfer one byte from input buffer to output bufio mwmr_bufio_write_byte( &bufio , bufin[ptr] ); // analyse this byte to find EOI marker OxFFD8 // flush the output buffer when EOI found if ( ff_found ) // possible End of Image { ff_found = 0; if ( bufin[ptr] == 0xD9 ) // End of Image found { // exit current image eoi_found = 1; // flush output bufio mwmr_bufio_flush( &bufio ); } } else // test if first byte of a marker { if ( bufin[ptr] == 0xFF ) ff_found = 1; } // increment input buffer pointer modulo 1024 ptr++; if ( ptr == 1024 ) ptr = 0; // increment bytes_count for current image bytes_count++; } // end while (eoi) #if DEBUG_MAIN PRINTF("\nMAIN send image %d to cluster %d at cycle %d : %d bytes\n", image , cluster , giet_proctime() , bytes_count ); #endif // increment image index image++; // increment cluster index modulo (x_size*y_size) cluster++; if (cluster == x_size * y_size) cluster = 0; } // end while on images ///////////////////////////////////////////////////////////////////////////////////// // wait all threads completion ///////////////////////////////////////////////////////////////////////////////////// for ( x = 0 ; x < x_size ; x++ ) { for ( y = 0 ; y < y_size ; y++ ) { index = x * y_size + y; if ( giet_pthread_join( trdid_demux[index] , NULL ) ) PRINTF("\n[MJPEG ERROR] calling giet_pthread_join() for demux[%d]\n", index ); if ( giet_pthread_join( trdid_vld[index] , NULL ) ) PRINTF("\n[MJPEG ERROR] calling giet_pthread_join() for vld[%d]\n", index ); if ( giet_pthread_join( trdid_iqzz[index] , NULL ) ) PRINTF("\n[MJPEG ERROR] calling giet_pthread_join() for iqzz[%d]\n", index ); if ( USE_DCT_COPROC == 0 ) { if ( giet_pthread_join( trdid_idct[index] , NULL ) ) PRINTF("\n[MJPEG ERROR] calling giet_pthread_join() for idct[%d]\n", index ); } if ( giet_pthread_join( trdid_libu[index] , NULL ) ) PRINTF("\n[MJPEG ERROR] calling giet_pthread_join() for libu[%d]\n", index ); if ( USE_DCT_COPROC ) { uint32_t cluster_xy = (x<<4) + y; uint32_t coproc_type = 2; giet_coproc_release( cluster_xy , coproc_type ); } } } ///////////////////////////////////////////////////////////////////////////////////// // makes instrumentation ///////////////////////////////////////////////////////////////////////////////////// // display on TTY PRINTF("\n[MJPEG] Instumentation Results\n" ); for ( image = 0 ; image < MAX_IMAGES ; image++ ) PRINTF(" - Image %d : completed at cycle %d\n", image , date[image]); // save on disk unsigned int fdout = giet_fat_open( "/home/mjpeg_instrumentation" , O_CREAT); if ( fdout < 0 ) PRINTF("\n[MJPEG ERROR] cannot open file /home/mjpeg_instrumentation\n"); int ret = giet_fat_lseek( fdout, 0 , SEEK_END ); if( ret < 0 ) PRINTF("\n[MJPEG ERROR] cannot seek file /home/mjpeg_instrumentation\n"); if( USE_DCT_COPROC ) { giet_fat_fprintf( fdout, "\n*** stream %s / %d clusters / %d cores / DCT COPROC\n", file_pathname , x_size*y_size , nprocs ); } else { giet_fat_fprintf( fdout, "stream %s / %d clusters / %d cores / NO DCT COPROC\n\n", file_pathname , x_size*y_size , nprocs ); } for ( image = 0 ; image < MAX_IMAGES ; image++ ) { giet_fat_fprintf( fdout, " - Image %d : completed at cycle %d\n", image , date[image]); } // completed giet_pthread_exit( "main completed" ); } // end main()