Geant4 Cross Reference |
1 // Copyright (C) 2010, Guy Barrand. All rights reserved. 2 // See the file tools.license for terms. 3 4 #ifndef tools_wroot_mpi_ntuple_column_wise 5 #define tools_wroot_mpi_ntuple_column_wise 6 7 // MPI pntuple. It uses the tools/impi interface. 8 9 #include "base_pntuple_column_wise" 10 #include "mpi_basket_add" 11 #include "impi_ntuple" 12 13 #include "../S_STRING" 14 #include "../forit" 15 16 namespace tools { 17 namespace wroot { 18 19 class mpi_ntuple_column_wise : public base_pntuple_column_wise, public virtual impi_ntuple { 20 typedef base_pntuple_column_wise parent; 21 protected: 22 class basket_add : public mpi_basket_add { 23 typedef mpi_basket_add parent; 24 public: 25 virtual bool add_basket(basket* a_basket) { // we get ownership of a_basket. 26 if(m_row_mode) { 27 m_parallel_branch.m_parallel_baskets.push_back(a_basket); 28 if(ready_to_flush_baskets(m_cols)) {return flush_baskets(m_mpi,m_dest,m_tag,m_id,m_cols);} 29 return true; 30 } 31 bool status = mpi_send_basket(m_mpi,m_dest,m_tag,m_id,m_icol,*a_basket); 32 delete a_basket; 33 return status; 34 } 35 public: 36 basket_add(impi& a_mpi,int a_dest,int a_tag,uint32 a_id,uint32 a_icol, 37 branch& a_parallel_branch, 38 std::vector<icol*>& a_cols, 39 bool a_row_mode) 40 :parent(a_mpi,a_dest,a_tag,a_id,a_icol) 41 ,m_parallel_branch(a_parallel_branch) 42 ,m_cols(a_cols) 43 ,m_row_mode(a_row_mode) 44 {} 45 protected: 46 basket_add(const basket_add& a_from) 47 :branch::iadd_basket(a_from) 48 ,parent(a_from) 49 ,m_parallel_branch(a_from.m_parallel_branch) 50 ,m_cols(a_from.m_cols) 51 ,m_row_mode(a_from.m_row_mode) 52 {} 53 basket_add& operator=(const basket_add& a_from){ 54 parent::operator=(a_from); 55 m_row_mode = a_from.m_row_mode; 56 return *this; 57 } 58 protected: 59 branch& m_parallel_branch; 60 std::vector<icol*>& m_cols; 61 bool m_row_mode; 62 }; 63 64 public: 65 virtual bool add_row(impi& a_mpi,int a_dest,int a_tag) { 66 if(m_cols.empty()) return false; 67 68 tools_vforit(icol*,m_cols,it) (*it)->add(); 69 70 uint32 _icol = 0; 71 tools_vforit(icol*,m_cols,it) { 72 basket_add _badd(a_mpi,a_dest,a_tag,m_id,_icol,(*it)->get_branch(),m_cols,m_row_mode);_icol++; 73 if(!(*it)->get_branch().pfill(_badd,m_nev)) return false; 74 } 75 76 tools_vforit(icol*,m_cols,it) (*it)->set_def(); 77 return true; 78 } 79 80 virtual bool end_fill(impi& a_mpi,int a_dest,int a_tag) { 81 uint32 _icol = 0; 82 tools_vforit(icol*,m_cols,it) { 83 basket_add _badd(a_mpi,a_dest,a_tag,m_id,_icol,(*it)->get_branch(),m_cols,m_row_mode);_icol++; 84 if(!(*it)->get_branch().end_pfill(_badd)) return false; 85 } 86 87 if(m_row_mode) { 88 size_t number; 89 bool status = flush_remaining_baskets(number,a_mpi,a_dest,a_tag,m_id,m_cols); 90 if(number) { 91 m_out << "tools::wroot::mpi_ntuple_column_wise::end_fill : it remained " << number << " baskets not written on file." << std::endl; 92 status = false; 93 } 94 if(!status) return false; 95 } 96 97 a_mpi.pack_reset(); 98 if(!a_mpi.pack(mpi_protocol_end_fill())) return false; 99 if(!a_mpi.pack(m_id)) return false; 100 if(!end_leaves(a_mpi)) return false; 101 if(!a_mpi.send_buffer(a_dest,a_tag)) return false; 102 103 return true; 104 } 105 106 public: 107 mpi_ntuple_column_wise(uint32 a_id,std::ostream& a_out, 108 bool a_byte_swap,uint32 a_compression,seek a_seek_directory, 109 const std::string& a_name,const std::string& a_title, 110 bool a_row_mode,uint32 a_nev, 111 bool a_verbose) 112 :parent(a_out,a_byte_swap,a_compression,a_seek_directory,a_name,a_title,a_verbose) 113 ,m_id(a_id) 114 ,m_row_mode(a_row_mode) 115 ,m_nev(a_nev) 116 { 117 if(m_row_mode) { 118 if(!m_nev) m_nev = 4000; //4000*sizeof(double) = 32000 = default basket size. 119 } else { 120 m_nev = 0; 121 } 122 } 123 124 mpi_ntuple_column_wise(uint32 a_id,std::ostream& a_out, 125 bool a_byte_swap,uint32 a_compression,seek a_seek_directory, 126 const std::vector<uint32>& a_basket_sizes,const ntuple_booking& a_bkg, 127 bool a_row_mode,uint32 a_nev, 128 bool a_verbose) 129 :parent(a_out,a_byte_swap,a_compression,a_seek_directory,a_basket_sizes,a_bkg,a_verbose) 130 ,m_id(a_id) 131 ,m_row_mode(a_row_mode) 132 ,m_nev(a_nev) 133 { 134 if(m_row_mode) { 135 if(!m_nev) m_nev = 4000; //4000*sizeof(double) = 32000 = default basket size. 136 } else { 137 m_nev = 0; 138 } 139 } 140 virtual ~mpi_ntuple_column_wise() {} 141 protected: 142 mpi_ntuple_column_wise(const mpi_ntuple_column_wise& a_from) 143 :impi_ntuple(a_from) 144 ,parent(a_from) 145 ,m_row_mode(a_from.m_row_mode) 146 ,m_nev(a_from.m_nev) 147 {} 148 mpi_ntuple_column_wise& operator=(const mpi_ntuple_column_wise& a_from){parent::operator=(a_from);return *this;} 149 protected: 150 static bool ready_to_flush_baskets(std::vector<icol*>& a_cols) { 151 //return true if all parallel branches have at least one basket in their m_parallel_baskets. 152 if(a_cols.empty()) return false; 153 tools_vforit(icol*,a_cols,it) { 154 branch& _branch = (*it)->get_branch(); 155 if(_branch.m_parallel_baskets.empty()) return false; 156 } 157 return true; 158 } 159 static bool flush_baskets(impi& a_mpi,int a_dest,int a_tag,uint32 a_id,std::vector<icol*>& a_cols) { 160 a_mpi.pack_reset(); 161 if(!a_mpi.pack(mpi_protocol_baskets())) return false; 162 if(!a_mpi.pack(a_id)) return false; 163 164 bool status = true; 165 uint32 _icol = 0; 166 tools_vforit(icol*,a_cols,it) { 167 branch& _branch = (*it)->get_branch(); 168 basket* _front_basket = _branch.m_parallel_baskets.front(); 169 if(status) { 170 if(!mpi_pack_basket(a_mpi,_icol,*_front_basket)) status = false; 171 } 172 _branch.m_parallel_baskets.erase(_branch.m_parallel_baskets.begin()); 173 delete _front_basket; 174 _icol++; 175 } 176 if(!status) return false; 177 178 return a_mpi.send_buffer(a_dest,a_tag); 179 } 180 181 static bool flush_remaining_baskets(size_t& a_number,impi& a_mpi,int a_dest,int a_tag,uint32 a_id,std::vector<icol*>& a_cols) { 182 a_number = 0; 183 while(ready_to_flush_baskets(a_cols)) { 184 if(!flush_baskets(a_mpi,a_dest,a_tag,a_id,a_cols)) return false; 185 } 186 // look for pending baskets. 187 {tools_vforit(icol*,a_cols,it) { 188 branch& _branch = (*it)->get_branch(); 189 a_number += _branch.m_parallel_baskets.size(); 190 }} 191 {tools_vforit(icol*,a_cols,it) { 192 branch& _branch = (*it)->get_branch(); 193 safe_clear(_branch.m_parallel_baskets); 194 }} 195 return true; 196 } 197 protected: 198 bool end_leaves(impi& a_mpi) const { 199 #include "MPI_SET_MAX.icc" 200 tools_vforcit(icol*,m_cols,pit) { 201 base_leaf* _pleaf = (*pit)->get_leaf(); 202 203 bool set_done = false; 204 205 TOOLS_WROOT_MPI_NTUPLE_LEAF_STRING_SET_LENGTH_MAX 206 207 if(!set_done) { 208 if(!a_mpi.pack((uint32)0)) return false; 209 if(!a_mpi.pack((int)0)) return false; 210 } 211 } 212 #undef TOOLS_WROOT_MPI_NTUPLE_SET_MAX 213 #undef TOOLS_WROOT_MPI_NTUPLE_STRING_SET_MAX 214 return true; 215 } 216 protected: 217 uint32 m_id; 218 bool m_row_mode; 219 uint32 m_nev; 220 }; 221 222 }} 223 224 #endif