Geant4 Cross Reference |
1 // 1 // 2 // ******************************************* 2 // ******************************************************************** 3 // * License and Disclaimer 3 // * License and Disclaimer * 4 // * 4 // * * 5 // * The Geant4 software is copyright of th 5 // * The Geant4 software is copyright of the Copyright Holders of * 6 // * the Geant4 Collaboration. It is provided 6 // * the Geant4 Collaboration. It is provided under the terms and * 7 // * conditions of the Geant4 Software License 7 // * conditions of the Geant4 Software License, included in the file * 8 // * LICENSE and available at http://cern.ch/ 8 // * LICENSE and available at http://cern.ch/geant4/license . These * 9 // * include a list of copyright holders. 9 // * include a list of copyright holders. * 10 // * 10 // * * 11 // * Neither the authors of this software syst 11 // * Neither the authors of this software system, nor their employing * 12 // * institutes,nor the agencies providing fin 12 // * institutes,nor the agencies providing financial support for this * 13 // * work make any representation or warran 13 // * work make any representation or warranty, express or implied, * 14 // * regarding this software system or assum 14 // * regarding this software system or assume any liability for its * 15 // * use. Please see the license in the file 15 // * use. Please see the license in the file LICENSE and URL above * 16 // * for the full disclaimer and the limitatio 16 // * for the full disclaimer and the limitation of liability. * 17 // * 17 // * * 18 // * This code implementation is the result 18 // * This code implementation is the result of the scientific and * 19 // * technical work of the GEANT4 collaboratio 19 // * technical work of the GEANT4 collaboration. * 20 // * By using, copying, modifying or distri 20 // * By using, copying, modifying or distributing the software (or * 21 // * any work based on the software) you ag 21 // * any work based on the software) you agree to acknowledge its * 22 // * use in resulting scientific publicati 22 // * use in resulting scientific publications, and indicate your * 23 // * acceptance of all terms of the Geant4 Sof 23 // * acceptance of all terms of the Geant4 Software license. * 24 // ******************************************* 24 // ******************************************************************** 25 // 25 // 26 #include "G4VUserMPIrunMerger.hh" 26 #include "G4VUserMPIrunMerger.hh" 27 << 27 #include <mpi.h> 28 #include "G4MPImanager.hh" << 29 #include "G4MPIutils.hh" << 30 << 31 #include <algorithm> << 32 #include <assert.h> 28 #include <assert.h> >> 29 #include <algorithm> 33 #include <functional> 30 #include <functional> 34 #include <mpi.h> << 31 #include "G4MPIutils.hh" 35 32 36 G4VUserMPIrunMerger::G4VUserMPIrunMerger(const << 33 G4VUserMPIrunMerger::G4VUserMPIrunMerger( const G4Run* aRun , 37 : outputBuffer(nullptr), << 34 G4int destination , 38 outputBufferSize(0), << 35 G4int ver) : 39 outputBufferPosition(0), << 36 outputBuffer(nullptr),outputBufferSize(0),outputBufferPosition(0), 40 ownsBuffer(false), 37 ownsBuffer(false), 41 destinationRank(destination), << 38 destinationRank(destination), 42 run(const_cast<G4Run*>(aRun)), << 39 run(const_cast<G4Run*>(aRun)), 43 commSize(0), << 40 commSize(0), 44 verbose(ver), << 41 verbose(ver), 45 bytesSent(0) << 42 bytesSent(0) {} 46 {} << 43 47 << 44 #define DMSG( LVL , MSG ) { if ( verbose > LVL ) { G4cout << MSG << G4endl; } } 48 #define DMSG(LVL, MSG) \ << 49 { \ << 50 if (verbose > LVL) { \ << 51 G4cout << MSG << G4endl; \ << 52 } \ << 53 } << 54 45 55 void G4VUserMPIrunMerger::Send(const unsigned 46 void G4VUserMPIrunMerger::Send(const unsigned int destination) 56 { 47 { 57 assert(run != nullptr); << 48 assert(run!=nullptr); 58 G4int nevts = run->GetNumberOfEvent(); 49 G4int nevts = run->GetNumberOfEvent(); 59 DMSG(1, "G4VUserMPIrunMerger::Send() : Sendi << 50 DMSG( 1 , "G4VUserMPIrunMerger::Send() : Sending a G4run (" 60 << 51 <<run<<") with "<<nevts<<" events to: "<<destination); 61 input_userdata.clear(); 52 input_userdata.clear(); 62 Pack(); // User code << 53 Pack();//User code 63 InputUserData(&nevts, MPI::INT, 1); << 54 InputUserData(&nevts,MPI::INT,1); 64 55 65 DestroyBuffer(); 56 DestroyBuffer(); 66 G4int newbuffsize = 0; 57 G4int newbuffsize = 0; 67 for (const const_registered_data& el : input << 58 for ( const const_registered_data& el : input_userdata ) { 68 newbuffsize += (el.dt.Get_size() * el.coun << 59 newbuffsize += (el.dt.Get_size()*el.count); 69 } 60 } 70 char* buffer = new char[newbuffsize]; 61 char* buffer = new char[newbuffsize]; 71 // Avoid complains from valgrind (i'm not re << 62 //Avoid complains from valgrind (i'm not really sure why this is needed, but, beside the 72 // small cpu penalty, we can live with that) << 63 //small cpu penalty, we can live with that).) 73 std::fill(buffer, buffer + newbuffsize, 0); << 64 std::fill(buffer,buffer+newbuffsize,0); 74 ownsBuffer = true; << 65 ownsBuffer=true; 75 SetupOutputBuffer(buffer, newbuffsize, 0); << 66 SetupOutputBuffer(buffer,newbuffsize,0); 76 DMSG(3, "Buffer size: " << newbuffsize << " << 67 DMSG(3,"Buffer size: "<<newbuffsize<<" bytes at: "<<(void*)outputBuffer); 77 68 78 // Now userdata contains all data to be send << 69 //Now userdata contains all data to be send, do the real packing 79 for (const const_registered_data& el : input << 70 for ( const const_registered_data& el : input_userdata ) { 80 #ifdef G4MPI_USE_MPI_PACK_NOT_CONST 71 #ifdef G4MPI_USE_MPI_PACK_NOT_CONST 81 MPI_Pack(const_cast<void*>(el.p_data), el. << 72 MPI_Pack(const_cast<void*>(el.p_data),el.count,el.dt, 82 #else 73 #else 83 MPI_Pack(el.p_data, el.count, el.dt, << 74 MPI_Pack(el.p_data,el.count,el.dt, 84 #endif 75 #endif 85 outputBuffer, outputBufferSize, & << 76 outputBuffer,outputBufferSize, >> 77 &outputBufferPosition,COMM_G4COMMAND_); 86 } 78 } 87 assert(outputBufferSize == outputBufferPosit << 79 assert(outputBufferSize==outputBufferPosition); 88 COMM_G4COMMAND_.Send(outputBuffer, outputBuf << 80 COMM_G4COMMAND_.Send(outputBuffer , outputBufferSize , MPI::PACKED , 89 G4MPImanager::kTAG_RUN) << 81 destination , G4MPImanager::kTAG_RUN); 90 bytesSent += outputBufferSize; << 82 bytesSent+=outputBufferSize; 91 DMSG(2, "G4VUserMPIrunMerger::Send() : Done << 83 DMSG(2 , "G4VUserMPIrunMerger::Send() : Done "); 92 } 84 } 93 85 >> 86 94 void G4VUserMPIrunMerger::Receive(const unsign 87 void G4VUserMPIrunMerger::Receive(const unsigned int source) 95 { 88 { 96 const MPI::Intracomm* parentComm = G4MPImana << 89 DMSG( 1 , "G4VUserMPIrunMerger::Receive(...) , this rank : " 97 DMSG(1, "G4VUserMPIrunMerger::Receive(...) , << 90 <<MPI::COMM_WORLD.Get_rank()<<" and receiving from : "<<source); 98 << 91 //DestroyBuffer(); 99 // DestroyBuffer(); << 92 //Receive from all but one 100 // Receive from all but one << 93 //for (G4int rank = 0; rank < commSize-1; ++rank) 101 // for (G4int rank = 0; rank < commSize-1; + << 94 //{ 102 //{ << 95 MPI::Status status; 103 MPI::Status status; << 96 COMM_G4COMMAND_.Probe(source, G4MPImanager::kTAG_RUN, status); 104 COMM_G4COMMAND_.Probe(source, G4MPImanager:: << 97 //const G4int source = status.Get_source(); 105 // const G4int source = status.Get_source(); << 98 const G4int newbuffsize = status.Get_count(MPI::PACKED); 106 const G4int newbuffsize = status.Get_count(M << 99 DMSG(2,"Preparing to receive buffer of size: "<<newbuffsize); 107 DMSG(2, "Preparing to receive buffer of size << 100 char* buffer = outputBuffer; 108 char* buffer = outputBuffer; << 101 if ( newbuffsize > outputBufferSize ) { 109 if (newbuffsize > outputBufferSize) { << 102 DMSG(3,"New larger buffer expected, resize"); 110 DMSG(3, "New larger buffer expected, resiz << 103 //New larger buffer incoming, recreate buffer 111 // New larger buffer incoming, recreate bu << 104 delete[] outputBuffer; 112 delete[] outputBuffer; << 105 buffer = new char[newbuffsize]; 113 buffer = new char[newbuffsize]; << 106 //Avoid complains from valgrind (i'm not really sure why this is needed, but, beside the 114 // Avoid complains from valgrind (i'm not << 107 //small cpu penalty, we can live with that).) 115 // small cpu penalty, we can live with tha << 108 std::fill(buffer,buffer+newbuffsize,0); 116 std::fill(buffer, buffer + newbuffsize, 0) << 109 ownsBuffer = true; 117 ownsBuffer = true; << 110 } 118 } << 111 SetupOutputBuffer(buffer,newbuffsize,0); 119 SetupOutputBuffer(buffer, newbuffsize, 0); << 112 COMM_G4COMMAND_.Recv(buffer, newbuffsize, MPI::PACKED,source, 120 COMM_G4COMMAND_.Recv(buffer, newbuffsize, MP << 113 G4MPImanager::kTAG_RUN, status); 121 DMSG(3, "Buffer Size: " << outputBufferSize << 114 DMSG(3,"Buffer Size: "<<outputBufferSize<< " bytes at: "<<(void*)outputBuffer); 122 output_userdata.clear(); << 115 output_userdata.clear(); 123 // User code, if implemented will return the << 116 //User code, if implemented will return the concrete G4Run class 124 G4Run* aNewRun = UnPack(); << 117 G4Run* aNewRun = UnPack(); 125 if (aNewRun == nullptr) aNewRun = new G4Run; << 118 if ( aNewRun == nullptr ) aNewRun = new G4Run; 126 // Add number of events counter << 119 //Add number of events counter 127 G4int nevets = 0; << 120 G4int nevets = 0; 128 OutputUserData(&nevets, MPI::INT, 1); << 121 OutputUserData(&nevets,MPI::INT,1); 129 // now userdata contains all data references << 122 //now userdata contains all data references, do the real unpacking 130 for (const registered_data& el : output_user << 123 for ( const registered_data& el : output_userdata ) { 131 MPI_Unpack(outputBuffer, outputBufferSize, << 124 MPI_Unpack(outputBuffer,outputBufferSize,&outputBufferPosition, 132 COMM_G4COMMAND_); << 125 el.p_data,el.count,el.dt,COMM_G4COMMAND_); 133 } << 126 } 134 for (G4int i = 0; i < nevets; ++i) << 127 for ( G4int i = 0 ; i<nevets ; ++i ) aNewRun->RecordEvent(nullptr); 135 aNewRun->RecordEvent(nullptr); << 128 136 << 129 //Now merge received MPI run with global one 137 // Now merge received MPI run with global on << 130 DMSG(2,"Before G4Run::Merge : "<<run->GetNumberOfEvent()); 138 DMSG(2, "Before G4Run::Merge : " << run->Get << 131 run->Merge( aNewRun ); 139 run->Merge(aNewRun); << 132 DMSG(2,"After G4Run::Merge : "<<run->GetNumberOfEvent()); 140 DMSG(2, "After G4Run::Merge : " << run->GetN << 133 delete aNewRun; 141 delete aNewRun; << 134 //} 142 //} << 143 } 135 } 144 136 145 void G4VUserMPIrunMerger::Merge() 137 void G4VUserMPIrunMerger::Merge() 146 { 138 { 147 // G4cout << "G4VUserMPIrunMerger::Merge cal << 148 << 149 DMSG(0, "G4VUserMPIrunMerger::Merge called") 139 DMSG(0, "G4VUserMPIrunMerger::Merge called"); 150 const MPI::Intracomm* parentComm = G4MPImana << 140 const unsigned int myrank = MPI::COMM_WORLD.Get_rank(); 151 const unsigned int myrank = parentComm->Get_ << 141 commSize = MPI::COMM_WORLD.Get_size(); 152 commSize = G4MPImanager::GetManager()->GetAc << 142 if ( commSize == 1 ) { 153 // do not include extra worker in this commu << 143 DMSG(1,"Comm world size is 1, nothing to do"); 154 << 144 return; 155 if (commSize == 1) { << 156 DMSG(1, "Comm world size is 1, nothing to << 157 return; << 158 } 145 } 159 COMM_G4COMMAND_ = parentComm->Dup(); << 146 COMM_G4COMMAND_ = MPI::COMM_WORLD.Dup(); 160 bytesSent = 0; 147 bytesSent = 0; 161 const G4double sttime = MPI::Wtime(); 148 const G4double sttime = MPI::Wtime(); 162 149 163 // Use G4MPIutils to optimize communications << 150 //Use G4MPIutils to optimize communications between ranks 164 typedef std::function<void(unsigned int)> ha 151 typedef std::function<void(unsigned int)> handler_t; 165 using std::placeholders::_1; 152 using std::placeholders::_1; 166 handler_t sender = std::bind(&G4VUserMPIrunM << 153 handler_t sender = std::bind(&G4VUserMPIrunMerger::Send , this , _1); 167 handler_t receiver = std::bind(&G4VUserMPIru 154 handler_t receiver = std::bind(&G4VUserMPIrunMerger::Receive, this, _1); 168 std::function<void(void)> barrier = std::bin << 155 std::function<void(void)> barrier = 169 // G4cout << "go to G4mpi::Merge" << G4endl << 156 std::bind(&MPI::Intracomm::Barrier,&COMM_G4COMMAND_); 170 G4mpi::Merge(sender, receiver, barrier, comm << 157 G4mpi::Merge( sender , receiver , barrier , commSize , myrank ); 171 << 158 172 // OLD Style p2p communications << 159 //OLD Style p2p communications 173 /* << 160 /* 174 if ( myrank != destinationRank ) { << 161 if ( myrank != destinationRank ) { 175 DMSG(0,"Comm world size: "<<commSize<< << 162 DMSG(0,"Comm world size: "<<commSize<<" this rank is: " 176 <<myrank<<" sending to rank "<<de << 163 <<myrank<<" sending to rank "<<destinationRank); 177 Send(destinationRank); << 164 Send(destinationRank); 178 } else { << 165 } else { 179 DMSG(1,"Comm world size: "<<commSize<< << 166 DMSG(1,"Comm world size: "<<commSize<<" this rank is: " 180 <<myrank<<" receiving. "); << 167 <<myrank<<" receiving. "); 181 for ( unsigned int i = 0 ; i<commSize << 168 for ( unsigned int i = 0 ; i<commSize ; ++i) { 182 if ( i != myrank ) Receive(i); << 169 if ( i != myrank ) Receive(i); 183 } << 170 } 184 } << 171 } 185 */ << 172 */ >> 173 186 const G4double elapsed = MPI::Wtime() - stti 174 const G4double elapsed = MPI::Wtime() - sttime; 187 long total = 0; << 175 long total=0; 188 COMM_G4COMMAND_.Reduce(&bytesSent, &total, 1 << 176 COMM_G4COMMAND_.Reduce(&bytesSent,&total,1,MPI::LONG,MPI::SUM, 189 if (verbose > 0 && myrank == destinationRank << 177 destinationRank); 190 // Collect from ranks how much data was se << 178 if ( verbose > 0 && myrank == destinationRank ) { 191 G4cout << "G4VUserMPIrunMerger::Merge() - << 179 //Collect from ranks how much data was sent around 192 << double(total) / 1000. / elapsed << 180 G4cout<<"G4VUserMPIrunMerger::Merge() - data transfer performances: " 193 << " (Total Data Transfer= " << dou << 181 <<double(total)/1000./elapsed<<" kB/s" 194 << G4endl; << 182 <<" (Total Data Transfer= "<<double(total)/1000<<" kB in " >> 183 <<elapsed<<" s)."<<G4endl; 195 } 184 } 196 185 >> 186 >> 187 197 COMM_G4COMMAND_.Free(); 188 COMM_G4COMMAND_.Free(); 198 DMSG(0, "G4VUserMPIrunMerger::Merge done"); << 189 DMSG(0,"G4VUserMPIrunMerger::Merge done"); 199 } 190 } >> 191 200 192