source: soft/giet_vm/applications/classif/main.c @ 475

Last change on this file since 475 was 473, checked in by alain, 10 years ago

Improving the classif application:
We use now the distributed heap, and the user malloc()
to implement the distributed MWMR buffers and the
distributed MWMR descriptors.

File size: 11.0 KB
Line 
1/////////////////////////////////////////////////////////////////////////////////////////////
2// File   : main.c   (for classif application)
3// Date   : november 2014
4// author : Alain Greiner
5/////////////////////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application takes a Gigabit Ethernet packets stream, and
7// makes a packet analysis and classification, based on the source MAC address.
8// It uses the NIC peripheral, and the chained buffer build by the CMA component
9// to consume packets on the Gigabit Ethernet port.
10//
11// It is described as a TCG (Task and Communication Graph) containing
12// - one "load" task per cluster.
13// - from one to three "analyse" tasks per cluster.
14// In each cluster, the "load" task communicates with the local "analyse" tasks through
15// a local MWMR fifo containing NB_PROCS_MAX containers (one item = one container).
16// The MWMR fifo descriptor and the data buffer containing the containers are defined
17// as global variables distributed in (up to) 16 clusters.
18//
19// Initialisation is done in two steps by the "load" tasks:
20// - Task "load" in cluster[0][0] initialises NIC & CMA channel, and initialises
21//   the barrier between all "load" tasks. Other "load" tasks are waiting on the
22//   global_sync synchronisation variable.
23// - In each cluster[x][y], the load" task allocates the MWMR fifo descriptor & the data
24//   buffer in the local heap, and store the pointers on a global array of pointers.
25//   The "analyse" tasks are waiting on the sync[x][y] variables.
26//     
27// When initialisation is completed, all "load and "analyse" tasks loop on containers:
28// 1) The "load" task transfer containers from the kernel chbuf associated to the
29//    NIC_RX channel (in cluster[0][0]), to the local MWMR fifo (in cluster[x][y]),
30//    after an in termediate copy in a private stack buffer.
31//    Each "load" task loads CONTAINERS_MAX containers before exit, and the
32//    task in cluster[0,0] displays the results stored in global counters filled
33//    by the "analyse" tasks when all "load" tasks reach the barrier.
34//
35// 2) The "analyse" task transfer one container from the local MWMR fifo to a private
36//    local buffer. It analyse each packet contained in the container, compute the
37//    packet type, depending on the 4 MSB bits of the source MAC address,
38//    and increment the corresponding counters.
39//
40// It uses the he following hardware parameters, defined in the hard_config.h file:
41// - X_SIZE       : number of clusters in a row
42// - Y_SIZE       : number of clusters in a column
43// - NB_PROCS_MAX : number of processors per cluster
44/////////////////////////////////////////////////////////////////////////////////////////////
45
46#include "stdio.h"
47#include "barrier.h"
48#include "malloc.h"
49#include "user_lock.h"
50#include "mwmr_channel.h"
51#include "hard_config.h"
52
53#define LOAD_VERBOSE     0
54#define ANALYSE_VERBOSE  0
55#define CONTAINERS_MAX   10
56
57///////////////////////////////////////////////////////////////////////////////////////////
58//    Global variables
59// The communication channels are distributed in the clusters,
60// but the pointers arrays are global variables in cluster[0][0]
61///////////////////////////////////////////////////////////////////////////////////////////
62
63mwmr_channel_t*  mwmr[X_SIZE][Y_SIZE];        // distributed MWMR fifos pointers
64
65unsigned int     local_sync[X_SIZE][Y_SIZE];  // distributed synchros "load" / "analyse"
66
67unsigned int     global_sync = 0;             // global synchro between "load" tasks
68
69unsigned int     count[16];                   // instrumentation counters
70
71giet_barrier_t   barrier;                     // barrier between "load" (instrumentation)
72
73unsigned int     nic_channel;                 // allocated NIC channel index
74
75///////////////////////////////////////////////////////////////////////////////////////////
76__attribute__ ((constructor)) void load()
77///////////////////////////////////////////////////////////////////////////////////////////
78{
79    // get processor identifiers
80    unsigned int    x;
81    unsigned int    y;
82    unsigned int    l;
83    giet_proc_xyp( &x, &y, &l );
84
85    if (X_SIZE > 4 )  giet_exit("The X_SIZE parameter cannot be larger than 4\n");
86    if (Y_SIZE > 4 )  giet_exit("The Y_SIZE parameter cannot be larger than 4\n");
87
88    // local buffer to store one container in private stack
89    unsigned int  temp[1024];
90
91// giet_shr_printf("\n@@@ P[%d,%d,%d] enters load task at cycle %d\n",
92//                 x, y, l, giet_proctime() );
93
94    // allocates data buffer for MWMR fifo in local heap
95    unsigned int*  data = malloc( NB_PROCS_MAX<<12 );
96
97//giet_shr_printf("\n@@@ P[%d,%d,%d] completes data malloc at cycle %d "
98//                "/ &data = %x\n", x, y, l, giet_proctime(), (unsigned int)data );
99
100    // allocates MWMR fifo descriptor in local heap
101    mwmr_channel_t*  fifo = malloc( sizeof(mwmr_channel_t) );
102
103//giet_shr_printf("\n@@@ P[%d,%d,%d] completes mwmr malloc at cycle %d "
104//                "/ &mwmr = %x\n", x, y, l, giet_proctime(), (unsigned int)mwmr );
105
106    // makes copy of pointer in global array for "analyse" tasks
107    mwmr[x][y] = fifo;
108
109    // display status for cluster[X_SIZE-1][Y_SIZE-1]
110    if ( (x==X_SIZE-1) && (y==Y_SIZE-1) )
111    giet_shr_printf("\n*** Task load starts on P[%d,%d,%d] at cycle %d\n"
112                    "      &fifo  = %x / &data  = %x / &sync  = %x\n"
113                    "      x_size = %d / y_size = %d / nprocs = %d\n",
114                    x , y , l , giet_proctime() , 
115                    (unsigned int)fifo , (unsigned int)data, (unsigned int)(&local_sync[x][y]) ,
116                    X_SIZE, Y_SIZE, NB_PROCS_MAX ); 
117
118    // Task load on cluster[0,0] makes global initialisation:
119    // - NIC & CMA channels allocation & initialisation.
120    // - barrier for all load tasks initialisation.
121    // Other load task wait completion.
122    if ( (x==0) && (y==0) )
123    {
124        // get NIC_RX channel
125        nic_channel = giet_nic_rx_alloc();
126
127        // start CMA transfer
128        giet_nic_rx_start();
129
130        // barrier init
131        barrier_init( &barrier, X_SIZE * Y_SIZE );
132
133        // clear NIC RX channels stats
134        giet_nic_rx_clear(); 
135
136        global_sync = 1;
137    }
138    else
139    {
140        while ( global_sync == 0 ) asm volatile ("nop");
141    }   
142
143    // Each load task initialises local MWMR fifo (width = 4kbytes / depth = NB_PROCS_MAX)
144    mwmr_init( fifo , data , 1024 , NB_PROCS_MAX );
145
146    // signal MWMR fifo initialisation completion to analyse tasks
147    local_sync[x][y] = 1;
148
149    // main loop (on containers)
150    unsigned int container = 0;
151    while ( container < CONTAINERS_MAX ) 
152    { 
153        // get one container from kernel chbuf
154        giet_nic_rx_move( nic_channel, temp );
155
156        // get packets number
157        unsigned int npackets = temp[0] & 0x0000FFFF;
158        unsigned int nwords   = temp[0] >> 16;
159
160        if ( (x==X_SIZE-1) && (y==Y_SIZE-1) )
161        giet_shr_printf("\nTask load on P[%d,%d,%d] get container %d at cycle %d"
162                        " : %d packets / %d words\n",
163                        x, y, l, container, giet_proctime(), npackets, nwords );
164
165        // put container to MWMR channel
166        mwmr_write( fifo, temp, 1 );
167
168        container++;
169    }
170
171    // all load tasks synchronise before result display
172    barrier_wait( &barrier );
173
174    // Task load in cluster[0,0] stops NIC and displays results
175    if ( (x==0) && (y==0) )
176    {
177        giet_nic_rx_stop();
178
179        giet_shr_printf("\n@@@@ Clasification Results @@@\n"
180                        " - TYPE 0 : %d packets\n"
181                        " - TYPE 1 : %d packets\n"
182                        " - TYPE 2 : %d packets\n"
183                        " - TYPE 3 : %d packets\n"
184                        " - TYPE 4 : %d packets\n"
185                        " - TYPE 5 : %d packets\n"
186                        " - TYPE 6 : %d packets\n"
187                        " - TYPE 7 : %d packets\n"
188                        " - TYPE 8 : %d packets\n"
189                        " - TYPE 9 : %d packets\n"
190                        " - TYPE A : %d packets\n"
191                        " - TYPE B : %d packets\n"
192                        " - TYPE C : %d packets\n"
193                        " - TYPE D : %d packets\n"
194                        " - TYPE E : %d packets\n"
195                        " - TYPE F : %d packets\n",
196                        count[0x0], count[0x1], count[0x2], count[0x3],
197                        count[0x4], count[0x5], count[0x6], count[0x7],
198                        count[0x8], count[0x9], count[0xA], count[0xB],
199                        count[0xC], count[0xD], count[0xE], count[0xF] );
200
201        giet_nic_rx_stats();
202
203    }
204
205    // all load tasks synchronise before exit
206    barrier_wait( &barrier );
207
208    giet_exit("Task completed");
209 
210} // end load()
211
212////////////////////////////////////////////
213__attribute__ ((constructor)) void analyse()
214////////////////////////////////////////////
215{
216    // get processor identifiers
217    unsigned int    x;
218    unsigned int    y;
219    unsigned int    l;
220    giet_proc_xyp( &x, &y, &l );
221
222    // local buffer to store one container
223    unsigned int  temp[1024];
224
225    // wait MWMR channel initialisation (done by task load)
226    while ( local_sync[x][y] == 0 ) asm volatile ("nop");
227
228    // get pointer on MWMR channel descriptor
229    mwmr_channel_t* fifo = mwmr[x][y];
230
231    // display status for cluster[X_SIZE-1][Y_SIZE-1]
232    if ( (x==X_SIZE-1) && (y==Y_SIZE-1) )
233    giet_shr_printf("\n*** Task analyse starts on P[%d,%d,%d] at cycle %d\n"
234                    "       &fifo = %x / &sync = %x\n",
235                    x, y, l, giet_proctime(), 
236                    (unsigned int)fifo, (unsigned int)(&local_sync[x][y]) );
237   
238    // main loop (on containers)
239    unsigned int nwords;     // number of words in container
240    unsigned int npackets;   // number of packets in container
241    unsigned int length;     // number of bytes in current packet
242    unsigned int word;       // current packet first word in container
243    unsigned int type;       // current packet type
244    unsigned int p;          // current packet index
245    while ( 1 )
246    { 
247        // get one container from MWMR fifo
248        mwmr_read( fifo, temp, 1 );
249
250        // get packets number
251        npackets = temp[0] & 0x0000FFFF;
252        nwords   = temp[0] >> 16;
253
254        if ( (x==X_SIZE-1) && (y==Y_SIZE-1) )
255        giet_shr_printf("\nTask analyse on P[%d,%d,%d] get container at cycle %d"
256                        " : %d packets / %d words\n",
257                                                x, y, l, giet_proctime(), npackets, nwords );
258
259        // initialize word index in container
260        word = 34;
261
262        // loop on packets
263        for( p = 0 ; p < npackets ; p++ )
264        {
265            // get packet length from container header
266            if ( (p & 0x1) == 0 )  length = temp[1+(p>>1)] >> 16;
267            else                   length = temp[1+(p>>1)] & 0x0000FFFF;
268
269            // get packet type (source mac address 4 MSB bits)
270            type = (temp[word+1] & 0x0000F000) >> 12;
271
272            // increment counter
273            atomic_increment( &count[type], 1 );
274
275            // update word index
276            if ( length & 0x3 ) word += (length>>2)+1;
277            else                word += (length>>2);
278        }
279    }
280} // end analyse()
281
Note: See TracBrowser for help on using the repository browser.