Changeset 635 for trunk/kernel/libk/remote_barrier.c
- Timestamp:
- Jun 26, 2019, 11:42:37 AM (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/kernel/libk/remote_barrier.c
r632 r635 83 83 pthread_barrierattr_t * attr ) 84 84 { 85 xptr_t gen_barrier_xp; // extended pointer on generic barrier descriptor86 85 generic_barrier_t * gen_barrier_ptr; // local pointer on generic barrier descriptor 87 86 void * barrier; // local pointer on implementation barrier descriptor … … 97 96 98 97 // allocate memory for generic barrier descriptor 99 if( ref_cxy == local_cxy ) // reference cluster is local 100 { 101 req.type = KMEM_GEN_BARRIER; 102 req.flags = AF_ZERO; 103 gen_barrier_ptr = kmem_alloc( &req ); 104 gen_barrier_xp = XPTR( local_cxy , gen_barrier_ptr ); 105 } 106 else // reference cluster is remote 107 { 108 rpc_kcm_alloc_client( ref_cxy, 109 KMEM_GEN_BARRIER, 110 &gen_barrier_xp ); 111 gen_barrier_ptr = GET_PTR( gen_barrier_xp ); 112 } 98 req.type = KMEM_KCM; 99 req.order = bits_log2( sizeof(generic_barrier_t) ); 100 req.flags = AF_ZERO | AF_KERNEL; 101 gen_barrier_ptr = kmem_remote_alloc( ref_cxy , &req ); 113 102 114 103 if( gen_barrier_ptr == NULL ) … … 124 113 barrier = simple_barrier_create( count ); 125 114 126 if( barrier == NULL ) 127 { 128 printk("\n[ERROR] in %s : cannot create simple barrier\n", __FUNCTION__); 129 return -1; 130 } 115 if( barrier == NULL ) return -1; 131 116 } 132 117 else // QDT barrier implementation … … 147 132 barrier = dqt_barrier_create( x_size , y_size , nthreads ); 148 133 149 if( barrier == NULL ) 150 { 151 printk("\n[ERROR] in %s : cannot create DQT barrier descriptor\n", __FUNCTION__); 152 return -1; 153 } 134 if( barrier == NULL ) return -1; 154 135 } 155 136 … … 211 192 212 193 // release memory allocated to barrier descriptor 213 if( gen_barrier_cxy == local_cxy ) 214 { 215 req.type = KMEM_GEN_BARRIER; 216 req.ptr = gen_barrier_ptr; 217 kmem_free( &req ); 218 } 219 else 220 { 221 rpc_kcm_free_client( gen_barrier_cxy, 222 gen_barrier_ptr, 223 KMEM_GEN_BARRIER ); 224 } 194 req.type = KMEM_KCM; 195 req.ptr = gen_barrier_ptr; 196 kmem_remote_free( ref_cxy , &req ); 197 225 198 } // end generic_barrier_destroy() 226 199 … … 273 246 simple_barrier_t * simple_barrier_create( uint32_t count ) 274 247 { 275 xptr_t barrier_xp;248 kmem_req_t req; 276 249 simple_barrier_t * barrier; 277 250 … … 285 258 286 259 // allocate memory for simple barrier descriptor 287 if( ref_cxy == local_cxy ) // reference is local 288 { 289 kmem_req_t req; 290 req.type = KMEM_SMP_BARRIER; 291 req.flags = AF_ZERO; 292 barrier = kmem_alloc( &req ); 293 barrier_xp = XPTR( local_cxy , barrier ); 294 } 295 else // reference is remote 296 { 297 rpc_kcm_alloc_client( ref_cxy, 298 KMEM_SMP_BARRIER, 299 &barrier_xp ); 300 barrier = GET_PTR( barrier_xp ); 301 } 302 303 if( barrier == NULL ) return NULL; 260 req.type = KMEM_KCM; 261 req.order = bits_log2( sizeof(simple_barrier_t) ); 262 req.flags = AF_ZERO | AF_KERNEL; 263 barrier = kmem_remote_alloc( ref_cxy , &req ); 264 265 if( barrier == NULL ) 266 { 267 printk("\n[ERROR] in %s : cannot create simple barrier\n", __FUNCTION__ ); 268 return NULL; 269 } 304 270 305 271 // initialise simple barrier descriptor … … 325 291 void simple_barrier_destroy( xptr_t barrier_xp ) 326 292 { 293 kmem_req_t req; 294 327 295 // get barrier cluster and local pointer 328 296 cxy_t barrier_cxy = GET_CXY( barrier_xp ); … … 330 298 331 299 // release memory allocated for barrier descriptor 332 if( barrier_cxy == local_cxy ) 333 { 334 kmem_req_t req; 335 req.type = KMEM_SMP_BARRIER; 336 req.ptr = barrier_ptr; 337 kmem_free( &req ); 338 } 339 else 340 { 341 rpc_kcm_free_client( barrier_cxy, 342 barrier_ptr, 343 KMEM_SMP_BARRIER ); 344 } 300 req.type = KMEM_KCM; 301 req.ptr = barrier_ptr; 302 kmem_remote_free( barrier_cxy , &req ); 345 303 346 304 #if DEBUG_BARRIER_DESTROY … … 498 456 499 457 #if DEBUG_BARRIER_CREATE 500 staticvoid dqt_barrier_display( xptr_t barrier_xp );458 void dqt_barrier_display( xptr_t barrier_xp ); 501 459 #endif 502 460 … … 506 464 uint32_t nthreads ) 507 465 { 508 xptr_t dqt_page_xp;509 page_t * rpc_page;510 xptr_t rpc_page_xp;511 466 dqt_barrier_t * barrier; // local pointer on DQT barrier descriptor 512 467 xptr_t barrier_xp; // extended pointer on DQT barrier descriptor 513 468 uint32_t z; // actual DQT size == max(x_size,y_size) 514 469 uint32_t levels; // actual number of DQT levels 515 xptr_t rpc_xp; // extended pointer on RPC descriptors array516 rpc_desc_t * rpc; // pointer on RPC descriptors array517 uint32_t responses; // responses counter for parallel RPCs518 reg_t save_sr; // for critical section519 470 uint32_t x; // X coordinate in QDT mesh 520 471 uint32_t y; // Y coordinate in QDT mesh … … 522 473 kmem_req_t req; // kmem request 523 474 524 // compute size and number of DQT levels475 // compute number of DQT levels, depending on the mesh size 525 476 z = (x_size > y_size) ? x_size : y_size; 526 477 levels = (z < 2) ? 1 : (z < 3) ? 2 : (z < 5) ? 3 : (z < 9) ? 4 : 5; … … 529 480 assert( (z <= 16) , "DQT mesh size larger than (16*16)\n"); 530 481 531 // check RPC descriptor size532 assert( (sizeof(rpc_desc_t) <= 128), "RPC descriptor larger than 128 bytes\n");533 534 482 // check size of an array of 5 DQT nodes 535 483 assert( (sizeof(dqt_node_t) * 5 <= 512 ), "array of DQT nodes larger than 512 bytes\n"); … … 538 486 assert( (sizeof(dqt_barrier_t) <= 0x4000 ), "DQT barrier descriptor larger than 4 pages\n"); 539 487 540 // get pointer on local client process descriptor488 // get pointer on client thread and process descriptors 541 489 thread_t * this = CURRENT_THREAD; 542 490 process_t * process = this->process; … … 553 501 cxy_t ref_cxy = GET_CXY( ref_xp ); 554 502 555 // 1. allocate 4 4 Kbytes pages for DQT barrier descriptor in reference cluster 556 dqt_page_xp = ppm_remote_alloc_pages( ref_cxy , 2 ); 557 558 if( dqt_page_xp == XPTR_NULL ) return NULL; 559 560 // get pointers on DQT barrier descriptor 561 barrier_xp = ppm_page2base( dqt_page_xp ); 562 barrier = GET_PTR( barrier_xp ); 503 // 1. allocate 4 small pages for the DQT barrier descriptor in reference cluster 504 req.type = KMEM_PPM; 505 req.order = 2; // 4 small pages == 16 Kbytes 506 req.flags = AF_ZERO | AF_KERNEL; 507 barrier = kmem_remote_alloc( ref_cxy , &req ); 508 509 if( barrier == NULL ) 510 { 511 printk("\n[ERROR] in %s : cannot create DQT barrier\n", __FUNCTION__ ); 512 return NULL; 513 } 514 515 // get pointers on DQT barrier descriptor in reference cluster 516 barrier_xp = XPTR( ref_cxy , barrier ); 563 517 564 518 // initialize global parameters in DQT barrier descriptor … … 569 523 #if DEBUG_BARRIER_CREATE 570 524 if( cycle > DEBUG_BARRIER_CREATE ) 571 printk("\n[%s] thread[%x,%x] created DQT barrier descriptor at(%x,%x)\n",525 printk("\n[%s] thread[%x,%x] created DQT barrier descriptor(%x,%x)\n", 572 526 __FUNCTION__, process->pid, this->trdid, ref_cxy, barrier ); 573 527 #endif 574 528 575 // 2. allocate memory from local cluster for an array of 256 RPCs descriptors 576 // cannot share the RPC descriptor, because the returned argument is not shared 577 req.type = KMEM_PAGE; 578 req.size = 3; // 8 pages == 32 Kbytes 579 req.flags = AF_ZERO; 580 rpc_page = kmem_alloc( &req ); 581 rpc_page_xp = XPTR( local_cxy , rpc_page ); 582 583 // get pointers on RPC descriptors array 584 rpc_xp = ppm_page2base( rpc_page_xp ); 585 rpc = GET_PTR( rpc_xp ); 586 587 #if DEBUG_BARRIER_CREATE 588 if( cycle > DEBUG_BARRIER_CREATE ) 589 printk("\n[%s] thread[%x,%x] created RPC descriptors array at (%x,%s)\n", 590 __FUNCTION__, process->pid, this->trdid, local_cxy, rpc ); 591 #endif 592 593 // 3. send parallel RPCs to all existing clusters covered by the DQT 594 // to allocate memory for an array of 5 DQT nodes in each cluster 529 // 2. allocate memory for an array of 5 DQT nodes 530 // in all existing clusters covered by the DQDT 595 531 // (5 nodes per cluster <= 512 bytes per cluster) 596 597 responses = 0; // initialize RPC responses counter 598 599 // mask IRQs 600 hal_disable_irq( &save_sr); 601 602 // client thread blocks itself 603 thread_block( XPTR( local_cxy , this ) , THREAD_BLOCKED_RPC ); 604 532 // and complete barrier descriptor initialisation. 605 533 for ( x = 0 ; x < x_size ; x++ ) 606 534 { 607 535 for ( y = 0 ; y < y_size ; y++ ) 608 536 { 609 // send RPC to existing clusters only 537 cxy_t cxy = HAL_CXY_FROM_XY( x , y ); // target cluster identifier 538 xptr_t local_array_xp; // xptr of nodes array in cluster cxy 539 540 // allocate memory in existing clusters only 610 541 if( LOCAL_CLUSTER->cluster_info[x][y] ) 611 542 { 612 cxy_t cxy = HAL_CXY_FROM_XY( x , y ); // target cluster identifier 613 614 // build a specific RPC descriptor for each target cluster 615 rpc[cxy].rsp = &responses; 616 rpc[cxy].blocking = false; 617 rpc[cxy].index = RPC_KCM_ALLOC; 618 rpc[cxy].thread = this; 619 rpc[cxy].lid = this->core->lid; 620 rpc[cxy].args[0] = (uint64_t)KMEM_512_BYTES; 621 622 // atomically increment expected responses counter 623 hal_atomic_add( &responses , 1 ); 624 625 // send a non-blocking RPC to allocate 512 bytes in target cluster 626 rpc_send( cxy , &rpc[cxy] ); 627 } 628 } 629 } 630 631 #if DEBUG_BARRIER_CREATE 632 if( cycle > DEBUG_BARRIER_CREATE ) 633 printk("\n[%s] thread[%x,%x] sent all RPC requests to allocate dqt_nodes array\n", 634 __FUNCTION__, process->pid, this->trdid ); 635 #endif 636 637 // client thread deschedule 638 sched_yield("blocked on parallel rpc_kcm_alloc"); 639 640 // restore IRQs 641 hal_restore_irq( save_sr); 642 643 // 4. initialize the node_xp[x][y][l] array in DQT barrier descriptor 644 // the node_xp[x][y][0] value is available in rpc.args[1] 645 646 #if DEBUG_BARRIER_CREATE 647 if( cycle > DEBUG_BARRIER_CREATE ) 648 printk("\n[%s] thread[%x,%x] initialises array of pointers on dqt_nodes\n", 649 __FUNCTION__, process->pid, this->trdid ); 650 #endif 651 652 for ( x = 0 ; x < x_size ; x++ ) 653 { 654 for ( y = 0 ; y < y_size ; y++ ) 655 { 656 cxy_t cxy = HAL_CXY_FROM_XY( x , y ); // target cluster identifier 657 xptr_t array_xp = (xptr_t)rpc[cxy].args[1]; // x_pointer on node array 658 uint32_t offset = sizeof( dqt_node_t ); // size of a DQT node 659 660 // set values into the node_xp[x][y][l] array 661 for ( l = 0 ; l < levels ; l++ ) 662 { 663 xptr_t node_xp = array_xp + (offset * l); 664 hal_remote_s64( XPTR( ref_cxy , &barrier->node_xp[x][y][l] ), node_xp ); 665 666 #if DEBUG_BARRIER_CREATE 543 req.type = KMEM_KCM; 544 req.order = 9; // 512 bytes 545 req.flags = AF_ZERO | AF_KERNEL; 546 547 void * ptr = kmem_remote_alloc( cxy , &req ); 548 549 if( ptr == NULL ) 550 { 551 printk("\n[ERROR] in %s : cannot allocate DQT in cluster %x\n", 552 __FUNCTION__, cxy ); 553 return NULL; 554 } 555 556 // build extended pointer on local node array in cluster cxy 557 local_array_xp = XPTR( cxy , ptr ); 558 559 // initialize the node_xp[x][y][l] array in barrier descriptor 560 for ( l = 0 ; l < levels ; l++ ) 561 { 562 xptr_t node_xp = local_array_xp + ( l * sizeof(dqt_node_t) ); 563 hal_remote_s64( XPTR( ref_cxy , &barrier->node_xp[x][y][l] ), node_xp ); 564 565 #if (DEBUG_BARRIER_CREATE & 1) 667 566 if( cycle > DEBUG_BARRIER_CREATE ) 668 567 printk(" - dqt_node_xp[%d,%d,%d] = (%x,%x) / &dqt_node_xp = %x\n", 669 568 x , y , l , GET_CXY( node_xp ), GET_PTR( node_xp ), &barrier->node_xp[x][y][l] ); 670 569 #endif 570 } 671 571 } 672 } 673 } 674 675 // 5. release memory locally allocated for the RPCs array 676 req.type = KMEM_PAGE; 677 req.ptr = rpc_page; 678 kmem_free( &req ); 572 else // register XPTR_NULL for all non-existing entries 573 { 574 for ( l = 0 ; l < levels ; l++ ) 575 { 576 hal_remote_s64( XPTR( ref_cxy , &barrier->node_xp[x][y][l] ), XPTR_NULL ); 577 } 578 } 579 } // end for y 580 } // end for x 679 581 680 582 #if DEBUG_BARRIER_CREATE 681 583 if( cycle > DEBUG_BARRIER_CREATE ) 682 printk("\n[%s] thread[%x,%x] released memory for RPC descriptors array\n",584 printk("\n[%s] thread[%x,%x] initialized array of pointers in DQT barrier\n", 683 585 __FUNCTION__, process->pid, this->trdid ); 684 586 #endif 685 587 686 // 6. initialise all distributed DQT nodes using remote accesses588 // 3. initialise all distributed DQT nodes using remote accesses 687 589 // and the pointers stored in the node_xp[x][y][l] array 688 590 for ( x = 0 ; x < x_size ; x++ ) … … 827 729 void dqt_barrier_destroy( xptr_t barrier_xp ) 828 730 { 829 page_t * rpc_page;830 xptr_t rpc_page_xp;831 rpc_desc_t * rpc; // local pointer on RPC descriptors array832 xptr_t rpc_xp; // extended pointer on RPC descriptor array833 reg_t save_sr; // for critical section834 731 kmem_req_t req; // kmem request 835 836 thread_t * this = CURRENT_THREAD; 732 uint32_t x; 733 uint32_t y; 734 837 735 838 736 // get DQT barrier descriptor cluster and local pointer … … 841 739 842 740 #if DEBUG_BARRIER_DESTROY 741 thread_t * this = CURRENT_THREAD; 843 742 uint32_t cycle = (uint32_t)hal_get_cycles(); 844 743 if( cycle > DEBUG_BARRIER_DESTROY ) … … 851 750 uint32_t y_size = hal_remote_l32( XPTR( barrier_cxy , &barrier_ptr->y_size ) ); 852 751 853 // 1. allocate memory from local cluster for an array of 256 RPCs descriptors 854 // cannot share the RPC descriptor, because the "buf" argument is not shared 855 req.type = KMEM_PAGE; 856 req.size = 3; // 8 pages == 32 Kbytes 857 req.flags = AF_ZERO; 858 rpc_page = kmem_alloc( &req ); 859 rpc_page_xp = XPTR( local_cxy , rpc_page ); 860 861 // get pointers on RPC descriptors array 862 rpc_xp = ppm_page2base( rpc_page_xp ); 863 rpc = GET_PTR( rpc_xp ); 864 865 // 2. send parallel RPCs to all existing clusters covered by the DQT 866 // to release memory allocated for the arrays of DQT nodes in each cluster 867 868 uint32_t responses = 0; // initialize RPC responses counter 869 870 // mask IRQs 871 hal_disable_irq( &save_sr); 872 873 // client thread blocks itself 874 thread_block( XPTR( local_cxy , this ) , THREAD_BLOCKED_RPC ); 875 876 uint32_t x , y; 877 878 #if DEBUG_BARRIER_DESTROY 879 if( cycle > DEBUG_BARRIER_DESTROY ) 880 printk("\n[%s] thread[%x,%x] send RPCs to release the distributed dqt_node array\n", 881 __FUNCTION__, this->process->pid, this->trdid ); 882 #endif 883 752 // 1. release memory allocated for the DQT nodes 753 // in all clusters covered by the QDT mesh 884 754 for ( x = 0 ; x < x_size ; x++ ) 885 755 { 886 756 for ( y = 0 ; y < y_size ; y++ ) 887 757 { 888 // send RPC to existing cluster only 758 // compute target cluster identifier 759 cxy_t cxy = HAL_CXY_FROM_XY( x , y ); 760 761 // existing cluster only 889 762 if( LOCAL_CLUSTER->cluster_info[x][y] ) 890 763 { 891 // compute target cluster identifier892 cxy_t cxy = HAL_CXY_FROM_XY( x , y );893 894 764 // get local pointer on dqt_nodes array in target cluster 895 765 xptr_t buf_xp_xp = XPTR( barrier_cxy , &barrier_ptr->node_xp[x][y][0] ); … … 899 769 assert( (cxy == GET_CXY(buf_xp)) , "bad extended pointer on dqt_nodes array\n" ); 900 770 901 // build a specific RPC descriptor 902 rpc[cxy].rsp = &responses; 903 rpc[cxy].blocking = false; 904 rpc[cxy].index = RPC_KCM_FREE; 905 rpc[cxy].thread = this; 906 rpc[cxy].lid = this->core->lid; 907 rpc[cxy].args[0] = (uint64_t)(intptr_t)buf; 908 rpc[cxy].args[1] = (uint64_t)KMEM_512_BYTES; 909 910 // atomically increment expected responses counter 911 hal_atomic_add( &responses , 1 ); 912 771 req.type = KMEM_KCM; 772 req.ptr = buf; 773 kmem_remote_free( cxy , &req ); 774 913 775 #if DEBUG_BARRIER_DESTROY 776 thread_t * this = CURRENT_THREAD; 777 uint32_t cycle = (uint32_t)hal_get_cycles(); 914 778 if( cycle > DEBUG_BARRIER_DESTROY ) 915 printk(" - target cluster(%d,%d) / buffer %x\n", x, y, buf ); 916 #endif 917 // send a non-blocking RPC to release 512 bytes in target cluster 918 rpc_send( cxy , &rpc[cxy] ); 779 printk("\n[%s] thread[%x,%x] released node array %x in cluster %x / cycle %d\n", 780 __FUNCTION__, this->process->pid, this->trdid, buf, cxy, cycle ); 781 #endif 919 782 } 920 783 } 921 784 } 922 785 923 // client thread deschedule 924 sched_yield("blocked on parallel rpc_kcm_free"); 925 926 // restore IRQs 927 hal_restore_irq( save_sr); 928 929 // 3. release memory locally allocated for the RPC descriptors array 930 req.type = KMEM_PAGE; 931 req.ptr = rpc_page; 932 kmem_free( &req ); 933 934 // 4. release memory allocated for barrier descriptor 935 xptr_t page_xp = ppm_base2page( barrier_xp ); 936 cxy_t page_cxy = GET_CXY( page_xp ); 937 page_t * page_ptr = GET_PTR( page_xp ); 938 939 ppm_remote_free_pages( page_cxy , page_ptr ); 786 // 2. release memory allocated for barrier descriptor in ref cluster 787 req.type = KMEM_PPM; 788 req.ptr = barrier_ptr; 789 kmem_remote_free( barrier_cxy , &req ); 940 790 941 791 #if DEBUG_BARRIER_DESTROY 942 792 cycle = (uint32_t)hal_get_cycles(); 943 793 if( cycle > DEBUG_BARRIER_DESTROY ) 944 printk("\n[%s] thread[%x,%x] exit for barrier (%x,%x) / cycle %d\n",794 printk("\n[%s] thread[%x,%x] release barrier descriptor (%x,%x) / cycle %d\n", 945 795 __FUNCTION__, this->process->pid, this->trdid, barrier_cxy, barrier_ptr, cycle ); 946 796 #endif … … 1022 872 { 1023 873 uint32_t level = hal_remote_l32( XPTR( node_cxy , &node_ptr->level )); 1024 uint32_t arity = hal_remote_l32( XPTR( node_cxy , &node_ptr->arity ));1025 uint32_t count = hal_remote_l32( XPTR( node_cxy , &node_ptr->current ));1026 874 xptr_t pa_xp = hal_remote_l32( XPTR( node_cxy , &node_ptr->parent_xp )); 1027 875 xptr_t c0_xp = hal_remote_l32( XPTR( node_cxy , &node_ptr->child_xp[0] )); … … 1030 878 xptr_t c3_xp = hal_remote_l32( XPTR( node_cxy , &node_ptr->child_xp[3] )); 1031 879 1032 printk(" . level %d : (%x,%x) / %d on %d /P(%x,%x) / C0(%x,%x)"880 printk(" . level %d : (%x,%x) / P(%x,%x) / C0(%x,%x)" 1033 881 " C1(%x,%x) / C2(%x,%x) / C3(%x,%x)\n", 1034 level, node_cxy, node_ptr, count, arity,882 level, node_cxy, node_ptr, 1035 883 GET_CXY(pa_xp), GET_PTR(pa_xp), 1036 884 GET_CXY(c0_xp), GET_PTR(c0_xp),
Note: See TracChangeset
for help on using the changeset viewer.