Geant4 Cross Reference |
1 // 2 // ******************************************************************** 3 // * License and Disclaimer * 4 // * * 5 // * The Geant4 software is copyright of the Copyright Holders of * 6 // * the Geant4 Collaboration. It is provided under the terms and * 7 // * conditions of the Geant4 Software License, included in the file * 8 // * LICENSE and available at http://cern.ch/geant4/license . These * 9 // * include a list of copyright holders. * 10 // * * 11 // * Neither the authors of this software system, nor their employing * 12 // * institutes,nor the agencies providing financial support for this * 13 // * work make any representation or warranty, express or implied, * 14 // * regarding this software system or assume any liability for its * 15 // * use. Please see the license in the file LICENSE and URL above * 16 // * for the full disclaimer and the limitation of liability. * 17 // * * 18 // * This code implementation is the result of the scientific and * 19 // * technical work of the GEANT4 collaboration. * 20 // * By using, copying, modifying or distributing the software (or * 21 // * any work based on the software) you agree to acknowledge its * 22 // * use in resulting scientific publications, and indicate your * 23 // * acceptance of all terms of the Geant4 Software license. * 24 // ******************************************************************** 25 // 26 #include "G4VUserMPIrunMerger.hh" 27 28 #include "G4MPImanager.hh" 29 #include "G4MPIutils.hh" 30 31 #include <algorithm> 32 #include <assert.h> 33 #include <functional> 34 #include <mpi.h> 35 36 G4VUserMPIrunMerger::G4VUserMPIrunMerger(const G4Run* aRun, G4int destination, G4int ver) 37 : outputBuffer(nullptr), 38 outputBufferSize(0), 39 outputBufferPosition(0), 40 ownsBuffer(false), 41 destinationRank(destination), 42 run(const_cast<G4Run*>(aRun)), 43 commSize(0), 44 verbose(ver), 45 bytesSent(0) 46 {} 47 48 #define DMSG(LVL, MSG) \ 49 { \ 50 if (verbose > LVL) { \ 51 G4cout << MSG << G4endl; \ 52 } \ 53 } 54 55 void G4VUserMPIrunMerger::Send(const unsigned int destination) 56 { 57 assert(run != nullptr); 58 G4int nevts = run->GetNumberOfEvent(); 59 DMSG(1, "G4VUserMPIrunMerger::Send() : Sending a G4run (" << run << ") with " << nevts 60 << " events to: " << destination); 61 input_userdata.clear(); 62 Pack(); // User code 63 InputUserData(&nevts, MPI::INT, 1); 64 65 DestroyBuffer(); 66 G4int newbuffsize = 0; 67 for (const const_registered_data& el : input_userdata) { 68 newbuffsize += (el.dt.Get_size() * el.count); 69 } 70 char* buffer = new char[newbuffsize]; 71 // Avoid complains from valgrind (i'm not really sure why this is needed, but, beside the 72 // small cpu penalty, we can live with that).) 73 std::fill(buffer, buffer + newbuffsize, 0); 74 ownsBuffer = true; 75 SetupOutputBuffer(buffer, newbuffsize, 0); 76 DMSG(3, "Buffer size: " << newbuffsize << " bytes at: " << (void*)outputBuffer); 77 78 // Now userdata contains all data to be send, do the real packing 79 for (const const_registered_data& el : input_userdata) { 80 #ifdef G4MPI_USE_MPI_PACK_NOT_CONST 81 MPI_Pack(const_cast<void*>(el.p_data), el.count, el.dt, 82 #else 83 MPI_Pack(el.p_data, el.count, el.dt, 84 #endif 85 outputBuffer, outputBufferSize, &outputBufferPosition, COMM_G4COMMAND_); 86 } 87 assert(outputBufferSize == outputBufferPosition); 88 COMM_G4COMMAND_.Send(outputBuffer, outputBufferSize, MPI::PACKED, destination, 89 G4MPImanager::kTAG_RUN); 90 bytesSent += outputBufferSize; 91 DMSG(2, "G4VUserMPIrunMerger::Send() : Done "); 92 } 93 94 void G4VUserMPIrunMerger::Receive(const unsigned int source) 95 { 96 const MPI::Intracomm* parentComm = G4MPImanager::GetManager()->GetComm(); 97 DMSG(1, "G4VUserMPIrunMerger::Receive(...) , this rank : " << parentComm->Get_rank() 98 << " and receiving from : " << source); 99 // DestroyBuffer(); 100 // Receive from all but one 101 // for (G4int rank = 0; rank < commSize-1; ++rank) 102 //{ 103 MPI::Status status; 104 COMM_G4COMMAND_.Probe(source, G4MPImanager::kTAG_RUN, status); 105 // const G4int source = status.Get_source(); 106 const G4int newbuffsize = status.Get_count(MPI::PACKED); 107 DMSG(2, "Preparing to receive buffer of size: " << newbuffsize); 108 char* buffer = outputBuffer; 109 if (newbuffsize > outputBufferSize) { 110 DMSG(3, "New larger buffer expected, resize"); 111 // New larger buffer incoming, recreate buffer 112 delete[] outputBuffer; 113 buffer = new char[newbuffsize]; 114 // Avoid complains from valgrind (i'm not really sure why this is needed, but, beside the 115 // small cpu penalty, we can live with that).) 116 std::fill(buffer, buffer + newbuffsize, 0); 117 ownsBuffer = true; 118 } 119 SetupOutputBuffer(buffer, newbuffsize, 0); 120 COMM_G4COMMAND_.Recv(buffer, newbuffsize, MPI::PACKED, source, G4MPImanager::kTAG_RUN, status); 121 DMSG(3, "Buffer Size: " << outputBufferSize << " bytes at: " << (void*)outputBuffer); 122 output_userdata.clear(); 123 // User code, if implemented will return the concrete G4Run class 124 G4Run* aNewRun = UnPack(); 125 if (aNewRun == nullptr) aNewRun = new G4Run; 126 // Add number of events counter 127 G4int nevets = 0; 128 OutputUserData(&nevets, MPI::INT, 1); 129 // now userdata contains all data references, do the real unpacking 130 for (const registered_data& el : output_userdata) { 131 MPI_Unpack(outputBuffer, outputBufferSize, &outputBufferPosition, el.p_data, el.count, el.dt, 132 COMM_G4COMMAND_); 133 } 134 for (G4int i = 0; i < nevets; ++i) 135 aNewRun->RecordEvent(nullptr); 136 137 // Now merge received MPI run with global one 138 DMSG(2, "Before G4Run::Merge : " << run->GetNumberOfEvent()); 139 run->Merge(aNewRun); 140 DMSG(2, "After G4Run::Merge : " << run->GetNumberOfEvent()); 141 delete aNewRun; 142 //} 143 } 144 145 void G4VUserMPIrunMerger::Merge() 146 { 147 // G4cout << "G4VUserMPIrunMerger::Merge called" << G4endl; 148 149 DMSG(0, "G4VUserMPIrunMerger::Merge called"); 150 const MPI::Intracomm* parentComm = G4MPImanager::GetManager()->GetComm(); 151 const unsigned int myrank = parentComm->Get_rank(); 152 commSize = G4MPImanager::GetManager()->GetActiveSize(); 153 // do not include extra worker in this communication 154 155 if (commSize == 1) { 156 DMSG(1, "Comm world size is 1, nothing to do"); 157 return; 158 } 159 COMM_G4COMMAND_ = parentComm->Dup(); 160 bytesSent = 0; 161 const G4double sttime = MPI::Wtime(); 162 163 // Use G4MPIutils to optimize communications between ranks 164 typedef std::function<void(unsigned int)> handler_t; 165 using std::placeholders::_1; 166 handler_t sender = std::bind(&G4VUserMPIrunMerger::Send, this, _1); 167 handler_t receiver = std::bind(&G4VUserMPIrunMerger::Receive, this, _1); 168 std::function<void(void)> barrier = std::bind(&MPI::Intracomm::Barrier, &COMM_G4COMMAND_); 169 // G4cout << "go to G4mpi::Merge" << G4endl; 170 G4mpi::Merge(sender, receiver, barrier, commSize, myrank); 171 172 // OLD Style p2p communications 173 /* 174 if ( myrank != destinationRank ) { 175 DMSG(0,"Comm world size: "<<commSize<<" this rank is: " 176 <<myrank<<" sending to rank "<<destinationRank); 177 Send(destinationRank); 178 } else { 179 DMSG(1,"Comm world size: "<<commSize<<" this rank is: " 180 <<myrank<<" receiving. "); 181 for ( unsigned int i = 0 ; i<commSize ; ++i) { 182 if ( i != myrank ) Receive(i); 183 } 184 } 185 */ 186 const G4double elapsed = MPI::Wtime() - sttime; 187 long total = 0; 188 COMM_G4COMMAND_.Reduce(&bytesSent, &total, 1, MPI::LONG, MPI::SUM, destinationRank); 189 if (verbose > 0 && myrank == destinationRank) { 190 // Collect from ranks how much data was sent around 191 G4cout << "G4VUserMPIrunMerger::Merge() - data transfer performances: " 192 << double(total) / 1000. / elapsed << " kB/s" 193 << " (Total Data Transfer= " << double(total) / 1000 << " kB in " << elapsed << " s)." 194 << G4endl; 195 } 196 197 COMM_G4COMMAND_.Free(); 198 DMSG(0, "G4VUserMPIrunMerger::Merge done"); 199 } 200