Geant4 Cross Reference |
1 // 1 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 5 // of this software and associated documentati 6 // in the Software without restriction, includ 7 // to use, copy, modify, merge, publish, distr 8 // copies of the Software, and to permit perso 9 // furnished to do so, subject to the followin 10 // The above copyright notice and this permiss 11 // all copies or substantial portions of the S 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 18 // 19 // ------------------------------------------- 20 // Tasking class implementation 21 // Class Description: 22 // ------------------------------------------ 23 // Author: Jonathan Madsen 24 // ------------------------------------------ 25 26 #include "PTL/UserTaskQueue.hh" 27 28 #include "PTL/AutoLock.hh" 29 #include "PTL/ScopeDestructor.hh" 30 #include "PTL/TaskGroup.hh" 31 #include "PTL/ThreadData.hh" 32 #include "PTL/ThreadPool.hh" 33 34 #include <cassert> 35 #include <chrono> 36 #include <functional> 37 #include <iostream> 38 #include <map> 39 #include <stdexcept> 40 #include <thread> 41 #include <utility> 42 43 namespace PTL 44 { 45 //============================================ 46 47 UserTaskQueue::UserTaskQueue(intmax_t nworkers 48 : VUserTaskQueue(nworkers) 49 , m_is_clone((parent) != nullptr) 50 , m_thread_bin((parent) ? (ThreadPool::get_thi 51 , m_insert_bin((parent) ? (ThreadPool::get_thi 52 , m_hold((parent) ? parent->m_hold : new std:: 53 , m_ntasks((parent) ? parent->m_ntasks : new s 54 , m_mutex((parent) ? parent->m_mutex : new Mut 55 , m_subqueues((parent) ? parent->m_subqueues : 56 { 57 // create nthreads + 1 subqueues so there 58 if(!parent) 59 { 60 for(intmax_t i = 0; i < nworkers + 1; 61 m_subqueues->emplace_back(new Task 62 } 63 } 64 65 //============================================ 66 67 UserTaskQueue::~UserTaskQueue() 68 { 69 if(!m_is_clone) 70 { 71 for(auto& itr : *m_subqueues) 72 { 73 assert(itr->empty()); 74 delete itr; 75 } 76 m_subqueues->clear(); 77 delete m_hold; 78 delete m_ntasks; 79 delete m_mutex; 80 delete m_subqueues; 81 } 82 } 83 84 //============================================ 85 86 void 87 UserTaskQueue::resize(intmax_t n) 88 { 89 if(!m_mutex) 90 throw std::runtime_error("nullptr to m 91 AutoLock lk(m_mutex); 92 if(m_workers < n) 93 { 94 while(m_workers < n) 95 { 96 m_subqueues->emplace_back(new Task 97 ++m_workers; 98 } 99 } 100 else if(m_workers > n) 101 { 102 while(m_workers > n) 103 { 104 delete m_subqueues->back(); 105 m_subqueues->pop_back(); 106 --m_workers; 107 } 108 } 109 } 110 111 //============================================ 112 113 VUserTaskQueue* 114 UserTaskQueue::clone() 115 { 116 return new UserTaskQueue(workers(), this); 117 } 118 //============================================ 119 120 intmax_t 121 UserTaskQueue::GetThreadBin() const 122 { 123 // get a thread id number 124 static thread_local intmax_t tl_bin = 125 (m_thread_bin + ThreadPool::get_this_t 126 return tl_bin; 127 } 128 129 //============================================ 130 131 intmax_t 132 UserTaskQueue::GetInsertBin() const 133 { 134 return (++m_insert_bin % (m_workers + 1)); 135 } 136 137 //============================================ 138 139 UserTaskQueue::task_pointer 140 UserTaskQueue::GetThreadBinTask() 141 { 142 intmax_t tbin = GetThreadBin(); 143 TaskSubQueue* task_subq = (*m_subqueues)[t 144 task_pointer _task = nullptr; 145 146 //---------------------------------------- 147 auto get_task = [&]() { 148 if(task_subq->AcquireClaim()) 149 { 150 // run task 151 _task = task_subq->PopTask(true); 152 // release the claim on the bin 153 task_subq->ReleaseClaim(); 154 } 155 if(_task) 156 --(*m_ntasks); 157 // return success if valid pointer 158 return (_task != nullptr); 159 }; 160 //---------------------------------------- 161 162 // while not empty 163 while(!task_subq->empty()) 164 { 165 if(get_task()) 166 break; 167 } 168 return _task; 169 } 170 171 //============================================ 172 173 UserTaskQueue::task_pointer 174 UserTaskQueue::GetTask(intmax_t subq, intmax_t 175 { 176 // exit if empty 177 if(this->true_empty()) 178 return nullptr; 179 180 // ensure the thread has a bin assignment 181 intmax_t tbin = GetThreadBin(); 182 intmax_t n = (subq < 0) ? tbin : subq; 183 if(nitr < 1) 184 nitr = (m_workers + 1); // * m_ntasks 185 186 if(m_hold->load(std::memory_order_relaxed) 187 { 188 return GetThreadBinTask(); 189 } 190 191 task_pointer _task = nullptr; 192 //---------------------------------------- 193 auto get_task = [&](intmax_t _n) { 194 TaskSubQueue* task_subq = (*m_subqueue 195 // try to acquire a claim for the bin 196 // if acquired, no other threads will 197 if(!task_subq->empty() && task_subq->A 198 { 199 // pop task out of bin 200 _task = task_subq->PopTask(n == tb 201 // release the claim on the bin 202 task_subq->ReleaseClaim(); 203 } 204 if(_task) 205 --(*m_ntasks); 206 // return success if valid pointer 207 return (_task != nullptr); 208 }; 209 //---------------------------------------- 210 211 // there are num_workers+1 bins so there i 212 // execute num_workers+2 iterations so the 213 // while(!empty()) 214 { 215 for(intmax_t i = 0; i < nitr; ++i, ++n 216 { 217 if(get_task(n % (m_workers + 1))) 218 return _task; 219 } 220 } 221 222 // only reached if looped over all bins (a 223 // and found no work so return an empty ta 224 // sleep if there is still no work by the 225 // condition variable 226 return _task; 227 } 228 229 //============================================ 230 231 intmax_t 232 UserTaskQueue::InsertTask(task_pointer&& task, 233 { 234 // increment number of tasks 235 ++(*m_ntasks); 236 237 bool spin = m_hold->load(std::memory_o 238 intmax_t tbin = GetThreadBin(); 239 240 if(data && data->within_task) 241 { 242 subq = tbin; 243 // spin = true; 244 } 245 246 // subq is -1 unless specified so unless s 247 // GetInsertBin() call increments a counte 248 // counter % (num_workers + 1) so that tas 249 // among the bins 250 intmax_t n = (subq < 0) ? GetInsertBin() : 251 252 //---------------------------------------- 253 auto insert_task = [&](intmax_t _n) { 254 TaskSubQueue* task_subq = (*m_subqueue 255 // TaskSubQueue* next_subq = (*m_subqu 256 // if not threads bin and size differe 257 // if(n != tbin && next_subq->size() < 258 // task_subq = next_subq; 259 // try to acquire a claim for the bin 260 // if acquired, no other threads will 261 if(task_subq->AcquireClaim()) 262 { 263 // push the task into the bin 264 task_subq->PushTask(std::move(task 265 // release the claim on the bin 266 task_subq->ReleaseClaim(); 267 // return success 268 return true; 269 } 270 return false; 271 }; 272 //---------------------------------------- 273 274 // if not in "hold/spin mode", where threa 275 // specified bin, then move onto next bin 276 // 277 if(spin) 278 { 279 n = n % (m_workers + 1); 280 while(!insert_task(n)) 281 ; 282 return n; 283 } 284 285 // there are num_workers+1 bins so there i 286 // execute num_workers+2 iterations so the 287 while(true) 288 { 289 auto _n = (n++) % (m_workers + 1); 290 if(insert_task(_n)) 291 return _n; 292 } 293 } 294 295 //============================================ 296 297 void 298 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* 299 { 300 using task_group_type = TaskGroup<int 301 using thread_execute_map_t = std::map<int6 302 303 if(!tp->is_alive()) 304 { 305 func(); 306 return; 307 } 308 309 task_group_type tg{ [](int& ref, int i) { 310 311 // wait for all threads to finish any work 312 // NOTE: will cause deadlock if called fro 313 while(tp->get_active_threads_count() > 0) 314 ThisThread::sleep_for(std::chrono::mil 315 316 thread_execute_map_t thread 317 std::vector<std::shared_ptr<VTask>> _tasks 318 _tasks.reserve(m_workers + 1); 319 320 AcquireHold(); 321 for(int i = 0; i < (m_workers + 1); ++i) 322 { 323 if(i == GetThreadBin()) 324 continue; 325 326 //------------------------------------ 327 auto thread_specific_func = [&]() { 328 ScopeDestructor _dtor = tg.get_sco 329 static Mutex _mtx; 330 _mtx.lock(); 331 bool& _executed = thread_execute_m 332 _mtx.unlock(); 333 if(!_executed) 334 { 335 func(); 336 _executed = true; 337 return 1; 338 } 339 return 0; 340 }; 341 //------------------------------------ 342 343 InsertTask(tg.wrap(thread_specific_fun 344 } 345 346 tp->notify_all(); 347 int nexecuted = tg.join(); 348 if(nexecuted != m_workers) 349 { 350 std::stringstream msg; 351 msg << "Failure executing routine on a 352 << " threads executed function out 353 std::cerr << msg.str() << std::endl; 354 } 355 ReleaseHold(); 356 } 357 358 //============================================ 359 360 void 361 UserTaskQueue::ExecuteOnSpecificThreads(Thread 362 functi 363 { 364 using task_group_type = TaskGroup<int 365 using thread_execute_map_t = std::map<int6 366 367 task_group_type tg{ [](int& ref, int i) { 368 369 // wait for all threads to finish any work 370 // NOTE: will cause deadlock if called fro 371 while(tp->get_active_threads_count() > 0) 372 ThisThread::sleep_for(std::chrono::mil 373 374 if(!tp->is_alive()) 375 { 376 func(); 377 return; 378 } 379 380 thread_execute_map_t thread_execute_map{}; 381 382 //======================================== 383 // wrap the function so that it will only 384 // has an ID in the set 385 auto thread_specific_func = [&]() { 386 ScopeDestructor _dtor = tg.get_scope_d 387 static Mutex _mtx; 388 _mtx.lock(); 389 bool& _executed = thread_execute_map[G 390 _mtx.unlock(); 391 if(!_executed && tid_set.count(ThisThr 392 { 393 func(); 394 _executed = true; 395 return 1; 396 } 397 return 0; 398 }; 399 //======================================== 400 401 if(tid_set.count(ThisThread::get_id()) > 0 402 func(); 403 404 AcquireHold(); 405 for(int i = 0; i < (m_workers + 1); ++i) 406 { 407 if(i == GetThreadBin()) 408 continue; 409 410 InsertTask(tg.wrap(thread_specific_fun 411 } 412 tp->notify_all(); 413 decltype(tid_set.size()) nexecuted = tg.jo 414 if(nexecuted != tid_set.size()) 415 { 416 std::stringstream msg; 417 msg << "Failure executing routine on s 418 << " threads executed function out 419 std::cerr << msg.str() << std::endl; 420 } 421 ReleaseHold(); 422 } 423 424 //============================================ 425 426 void 427 UserTaskQueue::AcquireHold() 428 { 429 bool _hold; 430 while(!(_hold = m_hold->load(std::memory_o 431 { 432 m_hold->compare_exchange_strong(_hold, 433 std::m 434 } 435 } 436 437 //============================================ 438 439 void 440 UserTaskQueue::ReleaseHold() 441 { 442 bool _hold; 443 while((_hold = m_hold->load(std::memory_or 444 { 445 m_hold->compare_exchange_strong(_hold, 446 std::m 447 } 448 } 449 450 //============================================ 451 452 } // namespace PTL 453