Geant4 Cross Reference |
1 // 1 2 // ******************************************* 3 // * License and Disclaimer 4 // * 5 // * The Geant4 software is copyright of th 6 // * the Geant4 Collaboration. It is provided 7 // * conditions of the Geant4 Software License 8 // * LICENSE and available at http://cern.ch/ 9 // * include a list of copyright holders. 10 // * 11 // * Neither the authors of this software syst 12 // * institutes,nor the agencies providing fin 13 // * work make any representation or warran 14 // * regarding this software system or assum 15 // * use. Please see the license in the file 16 // * for the full disclaimer and the limitatio 17 // * 18 // * This code implementation is the result 19 // * technical work of the GEANT4 collaboratio 20 // * By using, copying, modifying or distri 21 // * any work based on the software) you ag 22 // * use in resulting scientific publicati 23 // * acceptance of all terms of the Geant4 Sof 24 // ******************************************* 25 // 26 #include "G4VUserMPIrunMerger.hh" 27 28 #include "G4MPImanager.hh" 29 #include "G4MPIutils.hh" 30 31 #include <algorithm> 32 #include <assert.h> 33 #include <functional> 34 #include <mpi.h> 35 36 G4VUserMPIrunMerger::G4VUserMPIrunMerger(const 37 : outputBuffer(nullptr), 38 outputBufferSize(0), 39 outputBufferPosition(0), 40 ownsBuffer(false), 41 destinationRank(destination), 42 run(const_cast<G4Run*>(aRun)), 43 commSize(0), 44 verbose(ver), 45 bytesSent(0) 46 {} 47 48 #define DMSG(LVL, MSG) \ 49 { \ 50 if (verbose > LVL) { \ 51 G4cout << MSG << G4endl; \ 52 } \ 53 } 54 55 void G4VUserMPIrunMerger::Send(const unsigned 56 { 57 assert(run != nullptr); 58 G4int nevts = run->GetNumberOfEvent(); 59 DMSG(1, "G4VUserMPIrunMerger::Send() : Sendi 60 61 input_userdata.clear(); 62 Pack(); // User code 63 InputUserData(&nevts, MPI::INT, 1); 64 65 DestroyBuffer(); 66 G4int newbuffsize = 0; 67 for (const const_registered_data& el : input 68 newbuffsize += (el.dt.Get_size() * el.coun 69 } 70 char* buffer = new char[newbuffsize]; 71 // Avoid complains from valgrind (i'm not re 72 // small cpu penalty, we can live with that) 73 std::fill(buffer, buffer + newbuffsize, 0); 74 ownsBuffer = true; 75 SetupOutputBuffer(buffer, newbuffsize, 0); 76 DMSG(3, "Buffer size: " << newbuffsize << " 77 78 // Now userdata contains all data to be send 79 for (const const_registered_data& el : input 80 #ifdef G4MPI_USE_MPI_PACK_NOT_CONST 81 MPI_Pack(const_cast<void*>(el.p_data), el. 82 #else 83 MPI_Pack(el.p_data, el.count, el.dt, 84 #endif 85 outputBuffer, outputBufferSize, & 86 } 87 assert(outputBufferSize == outputBufferPosit 88 COMM_G4COMMAND_.Send(outputBuffer, outputBuf 89 G4MPImanager::kTAG_RUN) 90 bytesSent += outputBufferSize; 91 DMSG(2, "G4VUserMPIrunMerger::Send() : Done 92 } 93 94 void G4VUserMPIrunMerger::Receive(const unsign 95 { 96 const MPI::Intracomm* parentComm = G4MPImana 97 DMSG(1, "G4VUserMPIrunMerger::Receive(...) , 98 99 // DestroyBuffer(); 100 // Receive from all but one 101 // for (G4int rank = 0; rank < commSize-1; + 102 //{ 103 MPI::Status status; 104 COMM_G4COMMAND_.Probe(source, G4MPImanager:: 105 // const G4int source = status.Get_source(); 106 const G4int newbuffsize = status.Get_count(M 107 DMSG(2, "Preparing to receive buffer of size 108 char* buffer = outputBuffer; 109 if (newbuffsize > outputBufferSize) { 110 DMSG(3, "New larger buffer expected, resiz 111 // New larger buffer incoming, recreate bu 112 delete[] outputBuffer; 113 buffer = new char[newbuffsize]; 114 // Avoid complains from valgrind (i'm not 115 // small cpu penalty, we can live with tha 116 std::fill(buffer, buffer + newbuffsize, 0) 117 ownsBuffer = true; 118 } 119 SetupOutputBuffer(buffer, newbuffsize, 0); 120 COMM_G4COMMAND_.Recv(buffer, newbuffsize, MP 121 DMSG(3, "Buffer Size: " << outputBufferSize 122 output_userdata.clear(); 123 // User code, if implemented will return the 124 G4Run* aNewRun = UnPack(); 125 if (aNewRun == nullptr) aNewRun = new G4Run; 126 // Add number of events counter 127 G4int nevets = 0; 128 OutputUserData(&nevets, MPI::INT, 1); 129 // now userdata contains all data references 130 for (const registered_data& el : output_user 131 MPI_Unpack(outputBuffer, outputBufferSize, 132 COMM_G4COMMAND_); 133 } 134 for (G4int i = 0; i < nevets; ++i) 135 aNewRun->RecordEvent(nullptr); 136 137 // Now merge received MPI run with global on 138 DMSG(2, "Before G4Run::Merge : " << run->Get 139 run->Merge(aNewRun); 140 DMSG(2, "After G4Run::Merge : " << run->GetN 141 delete aNewRun; 142 //} 143 } 144 145 void G4VUserMPIrunMerger::Merge() 146 { 147 // G4cout << "G4VUserMPIrunMerger::Merge cal 148 149 DMSG(0, "G4VUserMPIrunMerger::Merge called") 150 const MPI::Intracomm* parentComm = G4MPImana 151 const unsigned int myrank = parentComm->Get_ 152 commSize = G4MPImanager::GetManager()->GetAc 153 // do not include extra worker in this commu 154 155 if (commSize == 1) { 156 DMSG(1, "Comm world size is 1, nothing to 157 return; 158 } 159 COMM_G4COMMAND_ = parentComm->Dup(); 160 bytesSent = 0; 161 const G4double sttime = MPI::Wtime(); 162 163 // Use G4MPIutils to optimize communications 164 typedef std::function<void(unsigned int)> ha 165 using std::placeholders::_1; 166 handler_t sender = std::bind(&G4VUserMPIrunM 167 handler_t receiver = std::bind(&G4VUserMPIru 168 std::function<void(void)> barrier = std::bin 169 // G4cout << "go to G4mpi::Merge" << G4endl 170 G4mpi::Merge(sender, receiver, barrier, comm 171 172 // OLD Style p2p communications 173 /* 174 if ( myrank != destinationRank ) { 175 DMSG(0,"Comm world size: "<<commSize<< 176 <<myrank<<" sending to rank "<<de 177 Send(destinationRank); 178 } else { 179 DMSG(1,"Comm world size: "<<commSize<< 180 <<myrank<<" receiving. "); 181 for ( unsigned int i = 0 ; i<commSize 182 if ( i != myrank ) Receive(i); 183 } 184 } 185 */ 186 const G4double elapsed = MPI::Wtime() - stti 187 long total = 0; 188 COMM_G4COMMAND_.Reduce(&bytesSent, &total, 1 189 if (verbose > 0 && myrank == destinationRank 190 // Collect from ranks how much data was se 191 G4cout << "G4VUserMPIrunMerger::Merge() - 192 << double(total) / 1000. / elapsed 193 << " (Total Data Transfer= " << dou 194 << G4endl; 195 } 196 197 COMM_G4COMMAND_.Free(); 198 DMSG(0, "G4VUserMPIrunMerger::Merge done"); 199 } 200