Geant4 Cross Reference |
1 // 1 // 2 // ******************************************* 2 // ******************************************************************** 3 // * License and Disclaimer 3 // * License and Disclaimer * 4 // * 4 // * * 5 // * The Geant4 software is copyright of th 5 // * The Geant4 software is copyright of the Copyright Holders of * 6 // * the Geant4 Collaboration. It is provided 6 // * the Geant4 Collaboration. It is provided under the terms and * 7 // * conditions of the Geant4 Software License 7 // * conditions of the Geant4 Software License, included in the file * 8 // * LICENSE and available at http://cern.ch/ 8 // * LICENSE and available at http://cern.ch/geant4/license . These * 9 // * include a list of copyright holders. 9 // * include a list of copyright holders. * 10 // * 10 // * * 11 // * Neither the authors of this software syst 11 // * Neither the authors of this software system, nor their employing * 12 // * institutes,nor the agencies providing fin 12 // * institutes,nor the agencies providing financial support for this * 13 // * work make any representation or warran 13 // * work make any representation or warranty, express or implied, * 14 // * regarding this software system or assum 14 // * regarding this software system or assume any liability for its * 15 // * use. Please see the license in the file 15 // * use. Please see the license in the file LICENSE and URL above * 16 // * for the full disclaimer and the limitatio 16 // * for the full disclaimer and the limitation of liability. * 17 // * 17 // * * 18 // * This code implementation is the result 18 // * This code implementation is the result of the scientific and * 19 // * technical work of the GEANT4 collaboratio 19 // * technical work of the GEANT4 collaboration. * 20 // * By using, copying, modifying or distri 20 // * By using, copying, modifying or distributing the software (or * 21 // * any work based on the software) you ag 21 // * any work based on the software) you agree to acknowledge its * 22 // * use in resulting scientific publicati 22 // * use in resulting scientific publications, and indicate your * 23 // * acceptance of all terms of the Geant4 Sof 23 // * acceptance of all terms of the Geant4 Software license. * 24 // ******************************************* 24 // ******************************************************************** 25 // 25 // 26 #include "G4VUserMPIrunMerger.hh" 26 #include "G4VUserMPIrunMerger.hh" 27 << 27 #include <mpi.h> 28 #include "G4MPImanager.hh" << 29 #include "G4MPIutils.hh" << 30 << 31 #include <algorithm> << 32 #include <assert.h> 28 #include <assert.h> >> 29 #include <algorithm> 33 #include <functional> 30 #include <functional> 34 #include <mpi.h> << 31 #include "G4MPIutils.hh" >> 32 #include "G4MPImanager.hh" 35 33 36 G4VUserMPIrunMerger::G4VUserMPIrunMerger(const << 34 G4VUserMPIrunMerger::G4VUserMPIrunMerger( const G4Run* aRun , 37 : outputBuffer(nullptr), << 35 G4int destination , 38 outputBufferSize(0), << 36 G4int ver) : 39 outputBufferPosition(0), << 37 outputBuffer(nullptr),outputBufferSize(0),outputBufferPosition(0), 40 ownsBuffer(false), 38 ownsBuffer(false), 41 destinationRank(destination), << 39 destinationRank(destination), 42 run(const_cast<G4Run*>(aRun)), << 40 run(const_cast<G4Run*>(aRun)), 43 commSize(0), << 41 commSize(0), 44 verbose(ver), << 42 verbose(ver), 45 bytesSent(0) << 43 bytesSent(0) {} 46 {} << 44 47 << 45 #define DMSG( LVL , MSG ) { if ( verbose > LVL ) { G4cout << MSG << G4endl; } } 48 #define DMSG(LVL, MSG) \ << 49 { \ << 50 if (verbose > LVL) { \ << 51 G4cout << MSG << G4endl; \ << 52 } \ << 53 } << 54 46 55 void G4VUserMPIrunMerger::Send(const unsigned 47 void G4VUserMPIrunMerger::Send(const unsigned int destination) 56 { 48 { 57 assert(run != nullptr); << 49 assert(run!=nullptr); 58 G4int nevts = run->GetNumberOfEvent(); 50 G4int nevts = run->GetNumberOfEvent(); 59 DMSG(1, "G4VUserMPIrunMerger::Send() : Sendi << 51 DMSG( 1 , "G4VUserMPIrunMerger::Send() : Sending a G4run (" 60 << 52 <<run<<") with "<<nevts<<" events to: "<<destination); 61 input_userdata.clear(); 53 input_userdata.clear(); 62 Pack(); // User code << 54 Pack();//User code 63 InputUserData(&nevts, MPI::INT, 1); << 55 InputUserData(&nevts,MPI::INT,1); 64 56 65 DestroyBuffer(); 57 DestroyBuffer(); 66 G4int newbuffsize = 0; 58 G4int newbuffsize = 0; 67 for (const const_registered_data& el : input << 59 for ( const const_registered_data& el : input_userdata ) { 68 newbuffsize += (el.dt.Get_size() * el.coun << 60 newbuffsize += (el.dt.Get_size()*el.count); 69 } 61 } 70 char* buffer = new char[newbuffsize]; 62 char* buffer = new char[newbuffsize]; 71 // Avoid complains from valgrind (i'm not re << 63 //Avoid complains from valgrind (i'm not really sure why this is needed, but, beside the 72 // small cpu penalty, we can live with that) << 64 //small cpu penalty, we can live with that).) 73 std::fill(buffer, buffer + newbuffsize, 0); << 65 std::fill(buffer,buffer+newbuffsize,0); 74 ownsBuffer = true; << 66 ownsBuffer=true; 75 SetupOutputBuffer(buffer, newbuffsize, 0); << 67 SetupOutputBuffer(buffer,newbuffsize,0); 76 DMSG(3, "Buffer size: " << newbuffsize << " << 68 DMSG(3,"Buffer size: "<<newbuffsize<<" bytes at: "<<(void*)outputBuffer); 77 69 78 // Now userdata contains all data to be send << 70 //Now userdata contains all data to be send, do the real packing 79 for (const const_registered_data& el : input << 71 for ( const const_registered_data& el : input_userdata ) { 80 #ifdef G4MPI_USE_MPI_PACK_NOT_CONST 72 #ifdef G4MPI_USE_MPI_PACK_NOT_CONST 81 MPI_Pack(const_cast<void*>(el.p_data), el. << 73 MPI_Pack(const_cast<void*>(el.p_data),el.count,el.dt, 82 #else 74 #else 83 MPI_Pack(el.p_data, el.count, el.dt, << 75 MPI_Pack(el.p_data,el.count,el.dt, 84 #endif 76 #endif 85 outputBuffer, outputBufferSize, & << 77 outputBuffer,outputBufferSize, >> 78 &outputBufferPosition,COMM_G4COMMAND_); 86 } 79 } 87 assert(outputBufferSize == outputBufferPosit << 80 assert(outputBufferSize==outputBufferPosition); 88 COMM_G4COMMAND_.Send(outputBuffer, outputBuf << 81 COMM_G4COMMAND_.Send(outputBuffer , outputBufferSize , MPI::PACKED , 89 G4MPImanager::kTAG_RUN) << 82 destination , G4MPImanager::kTAG_RUN); 90 bytesSent += outputBufferSize; << 83 bytesSent+=outputBufferSize; 91 DMSG(2, "G4VUserMPIrunMerger::Send() : Done << 84 DMSG(2 , "G4VUserMPIrunMerger::Send() : Done "); 92 } 85 } 93 86 >> 87 94 void G4VUserMPIrunMerger::Receive(const unsign 88 void G4VUserMPIrunMerger::Receive(const unsigned int source) 95 { 89 { 96 const MPI::Intracomm* parentComm = G4MPImana 90 const MPI::Intracomm* parentComm = G4MPImanager::GetManager()->GetComm(); 97 DMSG(1, "G4VUserMPIrunMerger::Receive(...) , << 91 DMSG( 1 , "G4VUserMPIrunMerger::Receive(...) , this rank : " 98 << 92 <<parentComm->Get_rank()<<" and receiving from : "<<source); 99 // DestroyBuffer(); << 93 //DestroyBuffer(); 100 // Receive from all but one << 94 //Receive from all but one 101 // for (G4int rank = 0; rank < commSize-1; + << 95 //for (G4int rank = 0; rank < commSize-1; ++rank) 102 //{ << 96 //{ 103 MPI::Status status; << 97 MPI::Status status; 104 COMM_G4COMMAND_.Probe(source, G4MPImanager:: << 98 COMM_G4COMMAND_.Probe(source, G4MPImanager::kTAG_RUN, status); 105 // const G4int source = status.Get_source(); << 99 //const G4int source = status.Get_source(); 106 const G4int newbuffsize = status.Get_count(M << 100 const G4int newbuffsize = status.Get_count(MPI::PACKED); 107 DMSG(2, "Preparing to receive buffer of size << 101 DMSG(2,"Preparing to receive buffer of size: "<<newbuffsize); 108 char* buffer = outputBuffer; << 102 char* buffer = outputBuffer; 109 if (newbuffsize > outputBufferSize) { << 103 if ( newbuffsize > outputBufferSize ) { 110 DMSG(3, "New larger buffer expected, resiz << 104 DMSG(3,"New larger buffer expected, resize"); 111 // New larger buffer incoming, recreate bu << 105 //New larger buffer incoming, recreate buffer 112 delete[] outputBuffer; << 106 delete[] outputBuffer; 113 buffer = new char[newbuffsize]; << 107 buffer = new char[newbuffsize]; 114 // Avoid complains from valgrind (i'm not << 108 //Avoid complains from valgrind (i'm not really sure why this is needed, but, beside the 115 // small cpu penalty, we can live with tha << 109 //small cpu penalty, we can live with that).) 116 std::fill(buffer, buffer + newbuffsize, 0) << 110 std::fill(buffer,buffer+newbuffsize,0); 117 ownsBuffer = true; << 111 ownsBuffer = true; 118 } << 112 } 119 SetupOutputBuffer(buffer, newbuffsize, 0); << 113 SetupOutputBuffer(buffer,newbuffsize,0); 120 COMM_G4COMMAND_.Recv(buffer, newbuffsize, MP << 114 COMM_G4COMMAND_.Recv(buffer, newbuffsize, MPI::PACKED,source, 121 DMSG(3, "Buffer Size: " << outputBufferSize << 115 G4MPImanager::kTAG_RUN, status); 122 output_userdata.clear(); << 116 DMSG(3,"Buffer Size: "<<outputBufferSize<< " bytes at: "<<(void*)outputBuffer); 123 // User code, if implemented will return the << 117 output_userdata.clear(); 124 G4Run* aNewRun = UnPack(); << 118 //User code, if implemented will return the concrete G4Run class 125 if (aNewRun == nullptr) aNewRun = new G4Run; << 119 G4Run* aNewRun = UnPack(); 126 // Add number of events counter << 120 if ( aNewRun == nullptr ) aNewRun = new G4Run; 127 G4int nevets = 0; << 121 //Add number of events counter 128 OutputUserData(&nevets, MPI::INT, 1); << 122 G4int nevets = 0; 129 // now userdata contains all data references << 123 OutputUserData(&nevets,MPI::INT,1); 130 for (const registered_data& el : output_user << 124 //now userdata contains all data references, do the real unpacking 131 MPI_Unpack(outputBuffer, outputBufferSize, << 125 for ( const registered_data& el : output_userdata ) { 132 COMM_G4COMMAND_); << 126 MPI_Unpack(outputBuffer,outputBufferSize,&outputBufferPosition, 133 } << 127 el.p_data,el.count,el.dt,COMM_G4COMMAND_); 134 for (G4int i = 0; i < nevets; ++i) << 128 } 135 aNewRun->RecordEvent(nullptr); << 129 for ( G4int i = 0 ; i<nevets ; ++i ) aNewRun->RecordEvent(nullptr); 136 << 130 137 // Now merge received MPI run with global on << 131 //Now merge received MPI run with global one 138 DMSG(2, "Before G4Run::Merge : " << run->Get << 132 DMSG(2,"Before G4Run::Merge : "<<run->GetNumberOfEvent()); 139 run->Merge(aNewRun); << 133 run->Merge( aNewRun ); 140 DMSG(2, "After G4Run::Merge : " << run->GetN << 134 DMSG(2,"After G4Run::Merge : "<<run->GetNumberOfEvent()); 141 delete aNewRun; << 135 delete aNewRun; 142 //} << 136 //} 143 } 137 } 144 138 145 void G4VUserMPIrunMerger::Merge() 139 void G4VUserMPIrunMerger::Merge() 146 { 140 { 147 // G4cout << "G4VUserMPIrunMerger::Merge cal 141 // G4cout << "G4VUserMPIrunMerger::Merge called" << G4endl; 148 142 149 DMSG(0, "G4VUserMPIrunMerger::Merge called") 143 DMSG(0, "G4VUserMPIrunMerger::Merge called"); 150 const MPI::Intracomm* parentComm = G4MPImana 144 const MPI::Intracomm* parentComm = G4MPImanager::GetManager()->GetComm(); 151 const unsigned int myrank = parentComm->Get_ 145 const unsigned int myrank = parentComm->Get_rank(); 152 commSize = G4MPImanager::GetManager()->GetAc 146 commSize = G4MPImanager::GetManager()->GetActiveSize(); 153 // do not include extra worker in this commu << 147 // do not include extra worker in this communication 154 148 155 if (commSize == 1) { << 149 if ( commSize == 1 ) { 156 DMSG(1, "Comm world size is 1, nothing to << 150 DMSG(1,"Comm world size is 1, nothing to do"); 157 return; << 151 return; 158 } 152 } 159 COMM_G4COMMAND_ = parentComm->Dup(); 153 COMM_G4COMMAND_ = parentComm->Dup(); 160 bytesSent = 0; 154 bytesSent = 0; 161 const G4double sttime = MPI::Wtime(); 155 const G4double sttime = MPI::Wtime(); 162 156 163 // Use G4MPIutils to optimize communications << 157 //Use G4MPIutils to optimize communications between ranks 164 typedef std::function<void(unsigned int)> ha 158 typedef std::function<void(unsigned int)> handler_t; 165 using std::placeholders::_1; 159 using std::placeholders::_1; 166 handler_t sender = std::bind(&G4VUserMPIrunM << 160 handler_t sender = std::bind(&G4VUserMPIrunMerger::Send , this , _1); 167 handler_t receiver = std::bind(&G4VUserMPIru 161 handler_t receiver = std::bind(&G4VUserMPIrunMerger::Receive, this, _1); 168 std::function<void(void)> barrier = std::bin << 162 std::function<void(void)> barrier = >> 163 std::bind(&MPI::Intracomm::Barrier,&COMM_G4COMMAND_); 169 // G4cout << "go to G4mpi::Merge" << G4endl 164 // G4cout << "go to G4mpi::Merge" << G4endl; 170 G4mpi::Merge(sender, receiver, barrier, comm << 165 G4mpi::Merge( sender , receiver , barrier , commSize , myrank ); 171 166 172 // OLD Style p2p communications << 167 //OLD Style p2p communications 173 /* << 168 /* 174 if ( myrank != destinationRank ) { << 169 if ( myrank != destinationRank ) { 175 DMSG(0,"Comm world size: "<<commSize<< << 170 DMSG(0,"Comm world size: "<<commSize<<" this rank is: " 176 <<myrank<<" sending to rank "<<de << 171 <<myrank<<" sending to rank "<<destinationRank); 177 Send(destinationRank); << 172 Send(destinationRank); 178 } else { << 173 } else { 179 DMSG(1,"Comm world size: "<<commSize<< << 174 DMSG(1,"Comm world size: "<<commSize<<" this rank is: " 180 <<myrank<<" receiving. "); << 175 <<myrank<<" receiving. "); 181 for ( unsigned int i = 0 ; i<commSize << 176 for ( unsigned int i = 0 ; i<commSize ; ++i) { 182 if ( i != myrank ) Receive(i); << 177 if ( i != myrank ) Receive(i); 183 } << 178 } 184 } << 179 } 185 */ << 180 */ 186 const G4double elapsed = MPI::Wtime() - stti 181 const G4double elapsed = MPI::Wtime() - sttime; 187 long total = 0; << 182 long total=0; 188 COMM_G4COMMAND_.Reduce(&bytesSent, &total, 1 << 183 COMM_G4COMMAND_.Reduce(&bytesSent,&total,1,MPI::LONG,MPI::SUM, 189 if (verbose > 0 && myrank == destinationRank << 184 destinationRank); 190 // Collect from ranks how much data was se << 185 if ( verbose > 0 && myrank == destinationRank ) { 191 G4cout << "G4VUserMPIrunMerger::Merge() - << 186 //Collect from ranks how much data was sent around 192 << double(total) / 1000. / elapsed << 187 G4cout<<"G4VUserMPIrunMerger::Merge() - data transfer performances: " 193 << " (Total Data Transfer= " << dou << 188 <<double(total)/1000./elapsed<<" kB/s" 194 << G4endl; << 189 <<" (Total Data Transfer= "<<double(total)/1000<<" kB in " >> 190 <<elapsed<<" s)."<<G4endl; 195 } 191 } 196 192 197 COMM_G4COMMAND_.Free(); 193 COMM_G4COMMAND_.Free(); 198 DMSG(0, "G4VUserMPIrunMerger::Merge done"); << 194 DMSG(0,"G4VUserMPIrunMerger::Merge done"); 199 } 195 } >> 196 200 197