source: soft/giet_vm/applications/classif/main.c @ 492

Last change on this file since 492 was 488, checked in by alain, 10 years ago

1) Updating the classif application to introduce a "store" task in each cluster.
2) Updating the convol application to use the new sbt_barrier_init() prototype.

File size: 21.7 KB
Line 
1/////////////////////////////////////////////////////////////////////////////////////////
2// File   : main.c   (for classif application)
3// Date   : november 2014
4// author : Alain Greiner
5/////////////////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application takes a stream of Gigabit Ethernet packets,
7// and makes packet analysis and classification, based on the source MAC address.
8// It uses the NIC peripheral, and the distributed kernel chbufs accessed by the CMA
9// component to receive and send packets on the Gigabit Ethernet port.
10//
11// This application is described as a TCG (Task and Communication Graph) containing
12// (N+2) tasks per cluster:
13// - one "load" task
14// - N "analyse" tasks
15// - one "store" task
16// The 4 Kbytes containers are diributed (N+2 containers per cluster):
17// - one RX container (part of the kernel rx_chbuf), in the kernel heap.
18// - one TX container (part of the kernel tx-chbuf), in the kernel heap.
19// - N working containers (one per analysis task), in the user heap.
20// In each cluster, the "load", analysis" and "store" tasks communicates through
21// three local MWMR fifos:
22// - fifo_l2a : tranfer a full container from "load" to "analyse" task.
23// - fifo_a2s : transfer a full container from "analyse" to "store" task.
24// - fifo_s2l : transfer an empty container from "store" to "load" task.
25// For each fifo, one item is a 32 bits word defining the index of an
26// available working container.
27// The pointers on the working containers, and the pointers on the MWMR fifos
28// are global arrays stored in cluster[0][0].
29// a local MWMR fifo containing NB_PROCS_MAX containers (one item = one container).
30// The MWMR fifo descriptors array is defined as a global variable in cluster[0][0].
31//
32// Initialisation is done in two steps by the "load" tasks:
33// - Task "load" in cluster[0][0] initialises NIC & CMA channel, and initialises
34//   the barrier between all "load" tasks. Other "load" tasks are waiting on the
35//   global_sync synchronisation variable.
36// - In each cluster[x][y], the "load" task allocates the working containers
37//   and the MWMR fifos descriptors in the local heap.
38//   The "analyse" tasks are waiting on the sync[x][y] variables.
39//
40// Instrumentation results display is done by the "store" task in cluster[0][0]
41// when all "store" tasks completed the number of clusters specified by the
42// CONTAINERS_MAX parameter.
43//     
44// When initialisation is completed, all tasks loop on containers:
45// 1) The "load" task get an empty working container from the fifo_s2l,
46//    transfer one container from the kernel rx_chbuf to this user container,
47//    and transfer ownership of this container to one "analysis" task by writing
48//    into the fifo_l2a.   
49//
50// 2) The "analyse" task get one working container from the fifo_l2a, analyse
51//    each packet header, compute the packet type (depending on the SRC MAC address),
52//    increment the correspondint classification counter, and transpose the SRC
53//    and the DST MAC addresses fot TX tranmission.
54//
55// 3) The "store" task transfer get a full working container from the fifo_a2s,
56//    transfer this user container content to the the kernel tx_chbuf,
57//    and transfer ownership of this empty container to the "load" task by writing
58//    into the fifo_s2l.   
59//
60// This application uses the following hardware parameters (hard_config.h file):
61// - X_SIZE       : number of clusters in a row
62// - Y_SIZE       : number of clusters in a column
63// - NB_PROCS_MAX : number of processors per cluster
64/////////////////////////////////////////////////////////////////////////////////////////
65
66#include "stdio.h"
67#include "barrier.h"
68#include "malloc.h"
69#include "user_lock.h"
70#include "mwmr_channel.h"
71#include "hard_config.h"
72
73#define CONTAINERS_MAX  5
74#define VERBOSE_ANALYSE 1
75#define ANALYSIS_TASKS  (NB_PROCS_MAX - 2)
76
77/////////////////////////////////////////////////////////////////////////////////////////
78//    Global variables
79// The MWMR channels (descriptors and buffers), as well as the working containers
80// used by the "analysis" tasks are distributed in clusters.
81// But the pointers on these distributed structures are shared arrays
82// stored in cluster[0][0].
83/////////////////////////////////////////////////////////////////////////////////////////
84
85// pointers on distributed temp[x][y][n] containers
86unsigned int*       container[X_SIZE][Y_SIZE][ANALYSIS_TASKS]; 
87
88// pointers on distributed mwmr fifos containing : temp[x][y][l] container descriptors
89mwmr_channel_t*     mwmr_l2a[X_SIZE][Y_SIZE]; 
90mwmr_channel_t*     mwmr_a2s[X_SIZE][Y_SIZE];
91mwmr_channel_t*     mwmr_s2l[X_SIZE][Y_SIZE]; 
92
93// local synchros signaling local MWMR fifos initialisation completion
94unsigned int        local_sync[X_SIZE][Y_SIZE]; 
95
96// global synchro signaling global initialisation completion
97unsigned int        load_sync  = 0; 
98unsigned int        store_sync = 0; 
99
100// instrumentation counters
101unsigned int        counter[16];
102
103// distributed barriers (between "load" and "store" tasks)
104giet_sbt_barrier_t  rx_barrier;
105giet_sbt_barrier_t  tx_barrier;
106
107// NIC_RX and NIC_TX channel index
108unsigned int        nic_rx_channel;
109unsigned int        nic_tx_channel;
110
111/////////////////////////////////////////
112__attribute__ ((constructor)) void load()
113/////////////////////////////////////////
114{
115    // each "load" task get processor identifiers
116    unsigned int    x;
117    unsigned int    y;
118    unsigned int    l;
119    giet_proc_xyp( &x, &y, &l );
120
121    // "load" task[0][0] initialises barrier between load tasks,
122    // allocates the NIC & CMA RX channels, and start the NIC_CMA RX transfer.
123    // Other "load" tasks wait completion
124    if ( (x==0) && (y==0) )
125    {
126        giet_shr_printf("\n*** Task load on P[%d][%d][%d] starts at cycle %d\n",
127                        x , y , l , giet_proctime() );
128 
129        sbt_barrier_init( &rx_barrier, X_SIZE*Y_SIZE , 1 );
130        nic_rx_channel = giet_nic_rx_alloc();
131        giet_nic_rx_start( nic_rx_channel );
132        load_sync = 1;
133    }
134    else
135    {
136        while ( load_sync == 0 ) asm volatile ("nop");
137    }   
138
139    // all load tasks allocate containers[x][y][n] (from local heap)
140    // and register pointers in the local stack
141    unsigned int   n;
142    unsigned int*  cont[ANALYSIS_TASKS]; 
143
144    for ( n = 0 ; n < ANALYSIS_TASKS ; n++ )
145    {
146        container[x][y][n] = malloc( 4096 );
147        cont[n]            = container[x][y][n];
148    }
149   
150    // all load tasks allocate data buffers for mwmr fifos (from local heap)
151    unsigned int*  data_l2a = malloc( ANALYSIS_TASKS<<2 );
152    unsigned int*  data_a2s = malloc( ANALYSIS_TASKS<<2 );
153    unsigned int*  data_s2l = malloc( ANALYSIS_TASKS<<2 );
154
155    // all load tasks allocate mwmr fifos descriptors (from local heap)
156    mwmr_l2a[x][y] = malloc( sizeof(mwmr_channel_t) );
157    mwmr_a2s[x][y] = malloc( sizeof(mwmr_channel_t) );
158    mwmr_s2l[x][y] = malloc( sizeof(mwmr_channel_t) );
159
160    // all "load" tasks register local pointers on mwmr fifos in local stack
161    mwmr_channel_t* fifo_l2a = mwmr_l2a[x][y];
162    mwmr_channel_t* fifo_a2s = mwmr_a2s[x][y];
163    mwmr_channel_t* fifo_s2l = mwmr_s2l[x][y];
164
165    // all "load" tasks initialise local mwmr fifos descriptors
166    // ( width = 4 bytes / depth = number of analysis tasks )
167    mwmr_init( fifo_l2a , data_l2a , 1 , ANALYSIS_TASKS );
168    mwmr_init( fifo_a2s , data_a2s , 1 , ANALYSIS_TASKS );
169    mwmr_init( fifo_s2l , data_s2l , 1 , ANALYSIS_TASKS );
170
171   
172    // all "load" tasks initialise local containers as empty in fifo_s2l
173    for ( n = 0 ; n < ANALYSIS_TASKS ; n++ ) mwmr_write( fifo_s2l , &n , 1 );
174
175    // each "load" task[x][y] signals mwmr fifos initialisation completion
176    // to other tasks in same cluster[x][y]
177    local_sync[x][y] = 1;
178
179    // "load" task[0][0] displays status
180    if ( (x==0) && (y==0) )
181    giet_shr_printf("\n*** Task load on P[%d,%d,%d] enters main loop at cycle %d\n"
182                    "      nic_rx_channel = %d / nic_tx_channel = %d\n"
183                    "      &mwmr_l2a  = %x / &data_l2a  = %x\n"
184                    "      &mwmr_a2s  = %x / &data_a2s  = %x\n"
185                    "      &mwmr_s2l  = %x / &data_s2l  = %x\n"
186                    "      &cont[0]   = %x\n"
187                    "      x_size = %d / y_size = %d / nprocs = %d\n",
188                    x , y , l , giet_proctime(), 
189                    nic_rx_channel , nic_tx_channel,
190                    (unsigned int)fifo_l2a, (unsigned int)data_l2a,
191                    (unsigned int)fifo_a2s, (unsigned int)data_a2s,
192                    (unsigned int)fifo_s2l, (unsigned int)data_s2l,
193                    (unsigned int)cont[0],
194                    X_SIZE, Y_SIZE, NB_PROCS_MAX );
195 
196    /////////////////////////////////////////////////////////////
197    // All "load" tasks enter the main loop (on containers)
198    unsigned int count = 0;     // loaded containers count
199    unsigned int index;         // available container index
200    unsigned int* temp;         // pointer on available container
201
202    while ( count < CONTAINERS_MAX ) 
203    { 
204        // get one empty count index from fifo_s2l
205        mwmr_read( fifo_s2l , &index , 1 );
206        temp = cont[index];
207
208        // get one count from  kernel rx_chbuf
209        giet_nic_rx_move( nic_rx_channel, temp );
210
211        // get packets number
212        unsigned int npackets = temp[0] & 0x0000FFFF;
213        unsigned int nwords   = temp[0] >> 16;
214
215        if ( (x==X_SIZE-1) && (y==Y_SIZE-1) )
216        giet_shr_printf("\n*** Task load on P[%d,%d,%d] get container %d at cycle %d"
217                        " : %d packets / %d words\n",
218                        x, y, l, count, giet_proctime(), npackets, nwords );
219
220        // put the full count index to fifo_l2a
221        mwmr_write( fifo_l2a, &index , 1 );
222
223        count++;
224    }
225
226    // all "load" tasks synchronise before stats
227    sbt_barrier_wait( &rx_barrier );
228
229    // "load" task[0][0] stops the NIC_CMA RX transfer and displays stats
230    if ( (x==0) && (y==0) ) 
231    {
232        giet_nic_rx_stop( nic_rx_channel );
233        giet_nic_rx_stats( nic_rx_channel );
234    }
235
236    // all "load" task exit
237    giet_exit("Task completed");
238 
239} // end load()
240
241
242//////////////////////////////////////////
243__attribute__ ((constructor)) void store()
244//////////////////////////////////////////
245{
246    // get processor identifiers
247    unsigned int    x;
248    unsigned int    y;
249    unsigned int    l;
250    giet_proc_xyp( &x, &y, &l );
251
252
253    // "store" task[0][0] initialises the barrier between all "store" tasks,
254    // allocates NIC & CMA TX channels, and starts the NIC_CMA TX transfer.
255    // Other "store" tasks wait completion.
256    if ( (x==0) && (y==0) )
257    {
258        giet_shr_printf("\n*** Task store on P[%d][%d][%d] starts at cycle %d\n",
259                        x , y , l , giet_proctime() );
260 
261        sbt_barrier_init( &tx_barrier , X_SIZE*Y_SIZE , 1 );
262        nic_tx_channel = giet_nic_tx_alloc();
263        giet_nic_tx_start( nic_tx_channel );
264        store_sync = 1;
265    }
266    else
267    {
268        while ( store_sync == 0 ) asm volatile ("nop");
269    }   
270
271    // all "store" tasks wait mwmr channels initialisation
272    while ( local_sync[x][y] == 0 ) asm volatile ("nop");
273
274    // all "store" tasks register pointers on working containers in local stack
275    unsigned int   n;
276    unsigned int*  cont[ANALYSIS_TASKS]; 
277    for ( n = 0 ; n < ANALYSIS_TASKS ; n++ )
278    {
279        cont[n] = container[x][y][n];
280    }
281   
282    // all "store" tasks register pointers on mwmr fifos in local stack
283    mwmr_channel_t* fifo_l2a = mwmr_l2a[x][y];
284    mwmr_channel_t* fifo_a2s = mwmr_a2s[x][y];
285    mwmr_channel_t* fifo_s2l = mwmr_s2l[x][y];
286
287    // "store" task[0][0] displays status
288    if ( (x==0) && (y==0) )
289    giet_shr_printf("\n*** Task store on P[%d,%d,%d] enters main loop at cycle %d\n"
290                    "      &mwmr_l2a  = %x\n"
291                    "      &mwmr_a2s  = %x\n"
292                    "      &mwmr_s2l  = %x\n"
293                    "      &cont[0]   = %x\n",
294                    x , y , l , giet_proctime(), 
295                    (unsigned int)fifo_l2a,
296                    (unsigned int)fifo_a2s,
297                    (unsigned int)fifo_s2l,
298                    (unsigned int)cont[0] );
299
300
301    /////////////////////////////////////////////////////////////
302    // all "store" tasks enter the main loop (on containers)
303    unsigned int count = 0;     // stored containers count
304    unsigned int index;         // empty container index
305    unsigned int* temp;         // pointer on empty container
306
307    while ( count < CONTAINERS_MAX ) 
308    { 
309        // get one working container index from fifo_a2s
310        mwmr_read( fifo_a2s , &index , 1 );
311        temp = cont[index];
312
313        // put one container to  kernel tx_chbuf
314        giet_nic_tx_move( nic_tx_channel, temp );
315
316        // get packets number
317        unsigned int npackets = temp[0] & 0x0000FFFF;
318        unsigned int nwords   = temp[0] >> 16;
319
320        if ( (x==X_SIZE-1) && (y==Y_SIZE-1) )
321        giet_shr_printf("\n*** Task store on P[%d,%d,%d] get container %d at cycle %d"
322                        " : %d packets / %d words\n",
323                        x, y, l, count, giet_proctime(), npackets, nwords );
324
325        // put the working container index to fifo_s2l
326        mwmr_write( fifo_s2l, &index , 1 );
327
328        count++;
329    }
330
331    // all "store" tasks synchronise before result display
332    sbt_barrier_wait( &tx_barrier );
333
334    // "store" task[0,0] stops NIC_CMA TX transfer and displays results
335    if ( (x==0) && (y==0) )
336    {
337        giet_nic_tx_stop( nic_tx_channel );
338
339        giet_shr_printf("\n@@@@ Classification Results @@@\n"
340                        " - TYPE 0 : %d packets\n"
341                        " - TYPE 1 : %d packets\n"
342                        " - TYPE 2 : %d packets\n"
343                        " - TYPE 3 : %d packets\n"
344                        " - TYPE 4 : %d packets\n"
345                        " - TYPE 5 : %d packets\n"
346                        " - TYPE 6 : %d packets\n"
347                        " - TYPE 7 : %d packets\n"
348                        " - TYPE 8 : %d packets\n"
349                        " - TYPE 9 : %d packets\n"
350                        " - TYPE A : %d packets\n"
351                        " - TYPE B : %d packets\n"
352                        " - TYPE C : %d packets\n"
353                        " - TYPE D : %d packets\n"
354                        " - TYPE E : %d packets\n"
355                        " - TYPE F : %d packets\n"
356                        "    TOTAL = %d packets\n",
357                        counter[0x0], counter[0x1], counter[0x2], counter[0x3],
358                        counter[0x4], counter[0x5], counter[0x6], counter[0x7],
359                        counter[0x8], counter[0x9], counter[0xA], counter[0xB],
360                        counter[0xC], counter[0xD], counter[0xE], counter[0xF],
361                        counter[0x0]+ counter[0x1]+ counter[0x2]+ counter[0x3]+
362                        counter[0x4]+ counter[0x5]+ counter[0x6]+ counter[0x7]+
363                        counter[0x8]+ counter[0x9]+ counter[0xA]+ counter[0xB]+
364                        counter[0xC]+ counter[0xD]+ counter[0xE]+ counter[0xF] );
365
366        giet_nic_tx_stats( nic_tx_channel );
367    }
368
369    // all "store" task exit
370    giet_exit("Task completed");
371
372} // end store()
373
374
375////////////////////////////////////////////
376__attribute__ ((constructor)) void analyse()
377////////////////////////////////////////////
378{
379    // get processor identifiers
380    unsigned int    x;
381    unsigned int    y;
382    unsigned int    l;
383    giet_proc_xyp( &x, &y, &l );
384
385    if ( (x==0) && (y==0) )
386    {
387        giet_shr_printf("\n*** Task analyse on P[%d][%d][%d] starts at cycle %d\n",
388                        x , y , l , giet_proctime() );
389    }
390 
391    // all "analyse" tasks wait mwmr channels initialisation
392    while ( local_sync[x][y] == 0 ) asm volatile ("nop");
393
394    // all "analyse" tasks register pointers on working containers in local stack
395    unsigned int   n;
396    unsigned int*  cont[ANALYSIS_TASKS]; 
397    for ( n = 0 ; n < ANALYSIS_TASKS ; n++ )
398    {
399        cont[n] = container[x][y][n];
400    }
401
402    // all "analyse" tasks register pointers on mwmr fifos in local stack
403    mwmr_channel_t* fifo_l2a = mwmr_l2a[x][y];
404    mwmr_channel_t* fifo_a2s = mwmr_a2s[x][y];
405
406    // "analyse" task[0][0] display status
407    if ( (x==0) && (y==0) )
408    giet_shr_printf("\n*** Task analyse on P[%d,%d,%d] enters main loop at cycle %d\n"
409                    "       &mwmr_l2a = %x\n"
410                    "       &mwmr_a2s = %x\n"
411                    "       &cont[0]  = %x\n",
412                    x, y, l, giet_proctime(), 
413                    (unsigned int)fifo_l2a,
414                    (unsigned int)fifo_a2s,
415                    (unsigned int)cont[0] );
416     
417    /////////////////////////////////////////////////////////////
418    // all "analyse" tasks enter the main loop (on containers)
419    unsigned int  index;           // available container index
420    unsigned int* temp;            // pointer on available container
421    unsigned int  nwords;          // number of words in container
422    unsigned int  npackets;        // number of packets in container
423    unsigned int  length;          // number of bytes in current packet
424    unsigned int  first;           // current packet first word in container
425    unsigned int  type;            // current packet type
426    unsigned int  p;               // current packet index
427
428#if VERBOSE_ANALYSE
429    unsigned int       verbose_len[10]; // save length for all packets in one container
430    unsigned long long verbose_dst[10]; // save length for all packets in one container
431    unsigned long long verbose_src[10]; // save length for all packets in one container
432#endif
433
434    while ( 1 )
435    { 
436
437#if VERBOSE_ANALYSE
438            for( p = 0 ; p < 10 ; p++ )
439            {
440                verbose_len[p] = 0;
441                verbose_dst[p] = 0;
442                verbose_src[p] = 0;
443            }
444#endif
445        // get one working container index from fifo_l2a
446        mwmr_read( fifo_l2a , &index , 1 );
447        temp = cont[index];
448
449        // get packets number and words number
450        npackets = temp[0] & 0x0000FFFF;
451        nwords   = temp[0] >> 16;
452
453        if ( (x==0) && (y==0) )
454        giet_shr_printf("\n*** Task analyse on P[%d,%d,%d] get container at cycle %d"
455                        " : %d packets / %d words\n",
456                                                x, y, l, giet_proctime(), npackets, nwords );
457
458        // initialize word index in container
459        first = 34;
460
461        // loop on packets
462        for( p = 0 ; p < npackets ; p++ )
463        {
464            // get packet length from container header
465            if ( (p & 0x1) == 0 )  length = temp[1+(p>>1)] >> 16;
466            else                   length = temp[1+(p>>1)] & 0x0000FFFF;
467
468            // compute packet DST and SRC MAC addresses
469            unsigned int word0 = temp[first];
470            unsigned int word1 = temp[first + 1];
471            unsigned int word2 = temp[first + 2];
472
473            unsigned long long dst = ((unsigned long long)(word1 & 0xFFFF0000)>>16) |
474                                     (((unsigned long long)word0)<<16);
475            unsigned long long src = ((unsigned long long)(word1 & 0x0000FFFF)<<32) |
476                                     ((unsigned long long)word2);
477#if VERBOSE_ANALYSE
478            if ( p < 10 )
479            {
480                verbose_len[p] = length;
481                verbose_dst[p] = dst;
482                verbose_src[p] = src;
483            }
484#endif
485            // compute type from SRC MAC address and increment counter
486            type = word1 & 0x0000000F;
487            atomic_increment( &counter[type], 1 );
488
489            // exchange SRC & DST MAC addresses for TX
490            temp[first]     = ((word1 & 0x0000FFFF)<<16) | ((word2 & 0xFFFF0000)>>16);
491            temp[first + 1] = ((word2 & 0x0000FFFF)<<16) | ((word0 & 0xFFFF0000)>>16);
492            temp[first + 2] = ((word0 & 0x0000FFFF)<<16) | ((word1 & 0xFFFF0000)>>16);
493
494            // update first word index
495            if ( length & 0x3 ) first += (length>>2)+1;
496            else                first += (length>>2);
497        }
498       
499#if VERBOSE_ANALYSE
500        if ( (x==0) && (y==0) )
501        giet_shr_printf("\n*** Task analyse on P[%d,%d,%d] completes at cycle %d\n"
502                        "   - Packet 0 : plen = %d / dst_mac = %l / src_mac = %l\n"
503                        "   - Packet 1 : plen = %d / dst_mac = %l / src_mac = %l\n"
504                        "   - Packet 2 : plen = %d / dst_mac = %l / src_mac = %l\n"
505                        "   - Packet 3 : plen = %d / dst_mac = %l / src_mac = %l\n"
506                        "   - Packet 4 : plen = %d / dst_mac = %l / src_mac = %l\n"
507                        "   - Packet 5 : plen = %d / dst_mac = %l / src_mac = %l\n"
508                        "   - Packet 6 : plen = %d / dst_mac = %l / src_mac = %l\n"
509                        "   - Packet 7 : plen = %d / dst_mac = %l / src_mac = %l\n"
510                        "   - Packet 8 : plen = %d / dst_mac = %l / src_mac = %l\n"
511                        "   - Packet 9 : plen = %d / dst_mac = %l / src_mac = %l\n",
512                        x , y , l , giet_proctime() , 
513                        verbose_len[0] , verbose_dst[0] , verbose_src[0] ,
514                        verbose_len[1] , verbose_dst[1] , verbose_src[1] ,
515                        verbose_len[2] , verbose_dst[2] , verbose_src[2] ,
516                        verbose_len[3] , verbose_dst[3] , verbose_src[3] ,
517                        verbose_len[4] , verbose_dst[4] , verbose_src[4] ,
518                        verbose_len[5] , verbose_dst[5] , verbose_src[5] ,
519                        verbose_len[6] , verbose_dst[6] , verbose_src[6] ,
520                        verbose_len[7] , verbose_dst[7] , verbose_src[7] ,
521                        verbose_len[8] , verbose_dst[8] , verbose_src[8] ,
522                        verbose_len[9] , verbose_dst[9] , verbose_src[9] );
523#endif
524           
525        // pseudo-random delay
526        for( p = 0 ; p < (giet_rand()>>4) ; p++ ) asm volatile ("nop");
527
528        // put the working container index to fifo_a2s
529        mwmr_write( fifo_a2s , &index , 1 );
530    }
531} // end analyse()
532
Note: See TracBrowser for help on using the repository browser.