Geant4 Cross Reference |
1 // Copyright (C) 2010, Guy Barrand. All rights reserved. 2 // See the file tools.license for terms. 3 4 #ifndef tools_wroot_mt_ntuple_column_wise 5 #define tools_wroot_mt_ntuple_column_wise 6 7 // mt = multi-threads. 8 9 #include "base_pntuple_column_wise" 10 #include "mt_basket_add" 11 #include "imt_ntuple" 12 13 namespace tools { 14 namespace wroot { 15 16 class mt_ntuple_column_wise : public base_pntuple_column_wise, public virtual imt_ntuple { 17 typedef base_pntuple_column_wise parent; 18 protected: 19 class basket_add : public mt_basket_add { 20 typedef mt_basket_add parent; 21 public: 22 virtual bool add_basket(basket* a_basket) { // we get ownership of a_basket. 23 if(m_row_mode) { 24 //m_ntuple.m_out << "debug : add_basket in parallel branch name " << m_parallel_branch.name() 25 // << ", num existing baskets " << m_parallel_branch.m_parallel_baskets.size() << "." << std::endl; 26 m_parallel_branch.m_parallel_baskets.push_back(a_basket); 27 if(ready_to_flush_baskets(m_cols)) {return flush_baskets(m_mutex,m_main_file,m_cols,m_main_branches);} 28 return true; 29 } else { 30 m_mutex.lock(); 31 uint32 add_bytes,nout; 32 bool status = false; 33 if(m_main_branch.add_basket(m_main_file,*a_basket,add_bytes,nout)) { 34 m_main_branch.set_tot_bytes(m_main_branch.tot_bytes()+add_bytes); 35 m_main_branch.set_zip_bytes(m_main_branch.zip_bytes()+nout); 36 status = true; 37 } 38 m_mutex.unlock(); 39 delete a_basket; 40 return status; 41 } 42 } 43 public: 44 basket_add(imutex& a_mutex,ifile& a_main_file, 45 branch& a_main_branch,branch& a_parallel_branch, 46 std::vector<icol*>& a_cols, 47 std::vector<branch*>& a_main_branches, 48 bool a_row_mode) 49 :parent(a_mutex,a_main_file,a_main_branch) 50 ,m_parallel_branch(a_parallel_branch) 51 ,m_cols(a_cols) 52 ,m_main_branches(a_main_branches) 53 ,m_row_mode(a_row_mode) 54 {} 55 protected: 56 basket_add(const basket_add& a_from) 57 :branch::iadd_basket(a_from) 58 ,parent(a_from) 59 ,m_parallel_branch(a_from.m_parallel_branch) 60 ,m_cols(a_from.m_cols) 61 ,m_main_branches(a_from.m_main_branches) 62 ,m_row_mode(a_from.m_row_mode) 63 {} 64 basket_add& operator=(const basket_add&){return *this;} 65 protected: 66 branch& m_parallel_branch; 67 std::vector<icol*>& m_cols; 68 std::vector<branch*>& m_main_branches; 69 bool m_row_mode; 70 }; 71 public: 72 virtual bool add_row(imutex& a_mutex,ifile& a_main_file) { 73 if(m_cols.empty()) return false; 74 {tools_vforit(icol*,m_cols,it) (*it)->add();} 75 if(m_main_branches.size()!=m_cols.size()) { 76 m_out << "tools::wroot::mt_ntuple_column_wise::add_row :" 77 << " m_main_branches.size() (" << m_main_branches.size() << ") != " 78 << "m_cols.size() (" << m_cols.size() << ")." 79 << std::endl; 80 return false; 81 } 82 {std::vector<branch*>::const_iterator itb = m_main_branches.begin(); 83 tools_vforit(icol*,m_cols,it) { 84 branch* main_branch = (*itb);itb++; 85 basket_add _badd(a_mutex,a_main_file,*main_branch,(*it)->get_branch(),m_cols,m_main_branches,m_row_mode); 86 if(!(*it)->get_branch().pfill(_badd,m_nev)) return false; 87 }} 88 {tools_vforit(icol*,m_cols,it) (*it)->set_def();} 89 return true; 90 } 91 virtual bool end_fill(imutex& a_mutex,ifile& a_main_file) { 92 if(m_main_branches.size()!=m_cols.size()) { 93 m_out << "tools::wroot::mt_ntuple_column_wise::end_fill :" 94 << " m_main_branches.size() (" << m_main_branches.size() << ") != " 95 << "m_cols.size() (" << m_cols.size() << ")." 96 << std::endl; 97 return false; 98 } 99 std::vector<branch*>::const_iterator itb = m_main_branches.begin(); 100 tools_vforit(icol*,m_cols,it) { 101 branch* main_branch = (*itb);itb++; 102 basket_add _badd(a_mutex,a_main_file,*main_branch,(*it)->get_branch(),m_cols,m_main_branches,m_row_mode); 103 if(!(*it)->get_branch().end_pfill(_badd)) return false; 104 } 105 if(m_row_mode) { 106 size_t number; 107 bool status = flush_remaining_baskets(number,a_mutex,a_main_file,m_cols,m_main_branches); 108 if(number) { 109 m_out << "tools::wroot::mt_ntuple_column_wise::end_fill : it remained " << number << " baskets not written on file." << std::endl; 110 status = false; 111 } 112 if(!status) return false; 113 } 114 return end_leaves(a_mutex); 115 } 116 public: 117 mt_ntuple_column_wise(std::ostream& a_out,bool a_byte_swap,uint32 a_compression,seek a_seek_directory, 118 std::vector<branch*>& a_main_branches, 119 const std::string& a_name,const std::string& a_title, 120 bool a_row_mode,uint32 a_nev, 121 bool a_verbose) 122 :parent(a_out,a_byte_swap,a_compression,a_seek_directory,a_name,a_title,a_verbose) 123 ,m_main_branches(a_main_branches) 124 ,m_row_mode(a_row_mode) 125 ,m_nev(a_nev) 126 { 127 if(m_row_mode) { 128 if(!m_nev) m_nev = 4000; //4000*sizeof(double) = 32000 = default basket size. 129 } else { 130 m_nev = 0; 131 } 132 } 133 134 mt_ntuple_column_wise(std::ostream& a_out,bool a_byte_swap,uint32 a_compression,seek a_seek_directory, 135 std::vector<branch*>& a_main_branches, 136 const std::vector<uint32>& a_basket_sizes, 137 const ntuple_booking& a_bkg, 138 bool a_row_mode,uint32 a_nev, 139 bool a_verbose) 140 :parent(a_out,a_byte_swap,a_compression,a_seek_directory,a_basket_sizes,a_bkg,a_verbose) 141 ,m_main_branches(a_main_branches) 142 ,m_row_mode(a_row_mode) 143 ,m_nev(a_nev) 144 { 145 if(m_row_mode) { 146 if(!m_nev) m_nev = 4000; 147 } else { 148 m_nev = 0; 149 } 150 } 151 152 #ifdef tools_wroot_mt_ntuple_column_wise //g4tools backcomp : 153 mt_ntuple_column_wise(std::ostream& a_out,bool a_byte_swap,uint32 a_compression,seek a_seek_directory, 154 std::vector<branch*>& a_main_branches, 155 const std::vector<uint32>& a_basket_sizes, 156 const ntuple_booking& a_bkg, 157 bool a_verbose) 158 :parent(a_out,a_byte_swap,a_compression,a_seek_directory,a_basket_sizes,a_bkg,a_verbose) 159 ,m_main_branches(a_main_branches) 160 ,m_row_mode(false) 161 ,m_nev(0) 162 { 163 if(m_row_mode) { 164 if(!m_nev) m_nev = 4000; 165 } else { 166 m_nev = 0; 167 } 168 } 169 #endif 170 171 virtual ~mt_ntuple_column_wise() {} 172 protected: 173 mt_ntuple_column_wise(const mt_ntuple_column_wise& a_from) 174 :imt_ntuple(a_from) 175 ,parent(a_from) 176 ,m_main_branches(a_from.m_main_branches) 177 ,m_row_mode(a_from.m_row_mode) 178 ,m_nev(a_from.m_nev) 179 {} 180 mt_ntuple_column_wise& operator=(const mt_ntuple_column_wise& a_from){parent::operator=(a_from);return *this;} 181 protected: 182 static bool ready_to_flush_baskets(std::vector<icol*>& a_cols) { 183 //return true if all parallel branches have at least one basket in their m_parallel_baskets. 184 if(a_cols.empty()) return false; 185 tools_vforit(icol*,a_cols,it) { 186 branch& _branch = (*it)->get_branch(); 187 if(_branch.m_parallel_baskets.empty()) return false; 188 } 189 return true; 190 } 191 static bool flush_baskets(imutex& a_mutex,ifile& a_main_file,std::vector<icol*>& a_cols,std::vector<branch*>& a_main_branches) { 192 a_mutex.lock(); 193 bool status = true; 194 std::vector<branch*>::const_iterator itb = a_main_branches.begin(); 195 tools_vforit(icol*,a_cols,it) { 196 branch& _branch = (*it)->get_branch(); 197 basket* _front_basket = _branch.m_parallel_baskets.front(); 198 branch* main_branch = (*itb);itb++; 199 if(status) { 200 uint32 add_bytes,nout; 201 if(main_branch->add_basket(a_main_file,*_front_basket,add_bytes,nout)) { 202 main_branch->set_tot_bytes(main_branch->tot_bytes()+add_bytes); 203 main_branch->set_zip_bytes(main_branch->zip_bytes()+nout); 204 } else { 205 status = false; 206 } 207 } 208 _branch.m_parallel_baskets.erase(_branch.m_parallel_baskets.begin()); 209 delete _front_basket; 210 } 211 a_mutex.unlock(); 212 return status; 213 } 214 215 static bool flush_remaining_baskets(size_t& a_number,imutex& a_mutex,ifile& a_main_file,std::vector<icol*>& a_cols,std::vector<branch*>& a_main_branches) { 216 a_number = 0; 217 while(ready_to_flush_baskets(a_cols)) { 218 if(!flush_baskets(a_mutex,a_main_file,a_cols,a_main_branches)) return false; 219 } 220 // look for pending baskets. 221 {tools_vforit(icol*,a_cols,it) { 222 branch& _branch = (*it)->get_branch(); 223 a_number += _branch.m_parallel_baskets.size(); 224 }} 225 {tools_vforit(icol*,a_cols,it) { 226 branch& _branch = (*it)->get_branch(); 227 safe_clear(_branch.m_parallel_baskets); 228 }} 229 return true; 230 } 231 232 bool end_leaves(imutex& a_mutex) const { 233 234 #include "MT_SET_MAX.icc" 235 236 std::vector<icol*>::const_iterator pit = m_cols.begin(); 237 tools_vforcit(branch*,m_main_branches,mit) { 238 if((*mit)->leaves().empty()) { 239 m_out << "tools::wroot::mt_ntuple_column_wise::end_leaves :" 240 << " branch " << (*mit)->name() << " without leaf." << std::endl; 241 return false; 242 } 243 244 base_leaf* _mleaf = *((*mit)->leaves().begin()); 245 base_leaf* _pleaf = (*pit)->get_leaf(); pit++; //WARNING. 246 247 TOOLS_WROOT_MT_NTUPLE_STRING_SET_MAX 248 249 } 250 #undef TOOLS_WROOT_MT_NTUPLE_STRING_SET_MAX 251 252 return true; 253 } 254 protected: 255 std::vector<branch*>& m_main_branches; 256 bool m_row_mode; 257 uint32 m_nev; 258 }; 259 260 }} 261 262 #endif