Geant4 Cross Reference |
1 // 1 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 5 // of this software and associated documentati 6 // in the Software without restriction, includ 7 // to use, copy, modify, merge, publish, distr 8 // copies of the Software, and to permit perso 9 // furnished to do so, subject to the followin 10 // The above copyright notice and this permiss 11 // all copies or substantial portions of the S 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 18 // 19 // ------------------------------------------- 20 // Tasking class header file 21 // 22 // Class Description: 23 // 24 // This file creates a class for an efficient 25 // accepts work in the form of tasks. 26 // 27 // ------------------------------------------- 28 // Author: Jonathan Madsen (Feb 13th 2018) 29 // ------------------------------------------- 30 31 #pragma once 32 33 #include "PTL/AutoLock.hh" 34 #ifndef G4GMAKE 35 #include "" 36 #endif 37 #include "PTL/ThreadData.hh" 38 #include "PTL/Threading.hh" 39 #include "PTL/Types.hh" 40 #include "PTL/VTask.hh" 41 #include "PTL/VUserTaskQueue.hh" 42 43 #if defined(PTL_USE_TBB) 44 # if !defined(TBB_SUPPRESS_DEPRECATED_MESSA 45 # define TBB_SUPPRESS_DEPRECATED_MESSAG 46 # endif 47 # if !defined(TBB_PREVIEW_GLOBAL_CONTROL) 48 # define TBB_PREVIEW_GLOBAL_CONTROL 1 49 # endif 50 # include <tbb/global_control.h> 51 # include <tbb/task_arena.h> 52 # include <tbb/task_group.h> 53 #endif 54 55 #include <algorithm> 56 #include <atomic> 57 #include <chrono> 58 #include <cstdint> 59 #include <cstdlib> 60 #include <deque> 61 #include <functional> 62 #include <iostream> 63 #include <map> 64 #include <memory> 65 #include <mutex> // IWYU pragma: keep 66 #include <set> 67 #include <thread> 68 #include <type_traits> // IWYU pragma: keep 69 #include <unordered_map> 70 #include <utility> 71 #include <vector> 72 73 namespace PTL 74 { 75 namespace thread_pool 76 { 77 namespace state 78 { 79 static const short STARTED = 0; 80 static const short PARTIAL = 1; 81 static const short STOPPED = 2; 82 static const short NONINIT = 3; 83 84 } // namespace state 85 } // namespace thread_pool 86 87 class ThreadPool 88 { 89 public: 90 template <typename KeyT, typename MappedT, 91 using uomap = std::unordered_map<KeyT, Map 92 93 // pod-types 94 using size_type = size_t; 95 using task_count_type = std::shared_ptr<s 96 using atomic_int_type = std::shared_ptr<s 97 using pool_state_type = std::shared_ptr<s 98 using atomic_bool_type = std::shared_ptr<s 99 // objects 100 using task_type = VTask; 101 using lock_t = std::shared_ptr<Mutex 102 using condition_t = std::shared_ptr<Condi 103 using task_pointer = std::shared_ptr<task_ 104 using task_queue_t = VUserTaskQueue; 105 // containers 106 using thread_list_t = std::deque<Thre 107 using bool_list_t = std::vector<boo 108 using thread_id_map_t = std::map<Thread 109 using thread_index_map_t = std::map<uintma 110 using thread_vec_t = std::vector<Thr 111 using thread_data_t = std::vector<std 112 // functions 113 using initialize_func_t = std::function<vo 114 using finalize_func_t = std::function<vo 115 using affinity_func_t = std::function<in 116 117 static affinity_func_t& affinity_functor() 118 { 119 static affinity_func_t _v = [](intmax_ 120 static std::atomic<intmax_t> assig 121 intmax_t _assi 122 return _assign % Thread::hardware_ 123 }; 124 return _v; 125 } 126 127 static initialize_func_t& initialization_f 128 { 129 static initialize_func_t _v = []() {}; 130 return _v; 131 } 132 133 static finalize_func_t& finalization_funct 134 { 135 static finalize_func_t _v = []() {}; 136 return _v; 137 } 138 139 struct Config 140 { 141 bool init = true; 142 bool use_tbb = false 143 bool use_affinity = false 144 int verbose = 0; 145 int priority = 0; 146 size_type pool_size = f_def 147 VUserTaskQueue* task_queue = nullp 148 affinity_func_t set_affinity = affin 149 initialize_func_t initializer = initi 150 finalize_func_t finalizer = final 151 }; 152 153 public: 154 // Constructor and Destructors 155 explicit ThreadPool(const Config&); 156 ~ThreadPool(); 157 ThreadPool(const ThreadPool&) = delete; 158 ThreadPool(ThreadPool&&) = default; 159 ThreadPool& operator=(const ThreadPool&) = 160 ThreadPool& operator=(ThreadPool&&) = defa 161 162 public: 163 // Public functions 164 size_type initialize_threadpool(size_type) 165 size_type destroy_threadpool(); 166 size_type stop_thread(); 167 168 template <typename FuncT> 169 void execute_on_all_threads(FuncT&& _func) 170 171 template <typename FuncT> 172 void execute_on_specific_threads(const std 173 FuncT&& 174 175 task_queue_t* get_queue() const { return 176 task_queue_t*& get_valid_queue(task_queue_ 177 178 bool is_tbb_threadpool() const { return m_ 179 180 public: 181 /// set the default pool size 182 static void set_default_size(size_type _v) 183 184 /// get the default pool size 185 static size_type get_default_size() { retu 186 187 public: 188 // add tasks for threads to process 189 size_type add_task(task_pointer&& task, in 190 // size_type add_thread_task(ThreadId id, 191 // add a generic container with iterator 192 template <typename ListT> 193 size_type add_tasks(ListT&); 194 195 Thread* get_thread(size_type _n) const; 196 Thread* get_thread(std::thread::id id) con 197 198 // only relevant when compiled with PTL_US 199 static tbb_global_control_t*& tbb_global_c 200 201 void set_initialization(initialize_func_t 202 void set_finalization(finalize_func_t f) { 203 204 void reset_initialization() 205 { 206 m_init_func = []() {}; 207 } 208 void reset_finalization() 209 { 210 m_fini_func = []() {}; 211 } 212 213 public: 214 // get the pool state 215 const pool_state_type& state() const { ret 216 // see how many main task threads there ar 217 size_type size() const { return m_pool_siz 218 // set the thread pool size 219 void resize(size_type _n); 220 // affinity assigns threads to cores, assi 221 bool using_affinity() const { return m_use 222 bool is_alive() { return m_alive_flag->loa 223 void notify(); 224 void notify_all(); 225 void notify(size_type); 226 bool is_initialized() const; 227 int get_active_threads_count() const { re 228 229 void set_affinity(affinity_func_t f) { m_a 230 void set_affinity(intmax_t i, Thread&) con 231 void set_priority(int _prio, Thread&) cons 232 233 void set_verbose(int n) { m_verbose = n; } 234 int get_verbose() const { return m_verbos 235 bool is_main() const { return ThisThread:: 236 237 tbb_task_arena_t* get_task_arena(); 238 239 public: 240 // read FORCE_NUM_THREADS environment vari 241 static const thread_id_map_t& get_thread_i 242 static uintmax_t get_thread_i 243 static uintmax_t get_this_thr 244 static uintmax_t add_thread_i 245 246 private: 247 void execute_thread(VUserTaskQueue*); // 248 int insert(task_pointer&&, int = -1); 249 int run_on_this(task_pointer&&); 250 251 private: 252 // called in THREAD INIT 253 static void start_thread(ThreadPool*, thre 254 255 void record_entry(); 256 void record_exit(); 257 258 private: 259 // Private variables 260 // random 261 bool m_use_affinity = fal 262 bool m_tbb_tp = fal 263 bool m_delete_task_queue = fal 264 int m_verbose = 0; 265 int m_priority = 0; 266 size_type m_pool_size = 0; 267 ThreadId m_main_tid = Thi 268 atomic_bool_type m_alive_flag = std 269 pool_state_type m_pool_state = std 270 atomic_int_type m_thread_awake = std 271 atomic_int_type m_thread_active = std 272 273 // locks 274 lock_t m_task_lock = std::make_shared<Mute 275 // conditions 276 condition_t m_task_cond = std::make_shared 277 278 // containers 279 bool_list_t m_is_joined = {}; // joi 280 bool_list_t m_is_stopped = {}; // let 281 thread_list_t m_main_threads = {}; // sto 282 thread_list_t m_stop_threads = {}; // sto 283 thread_vec_t m_threads = {}; 284 thread_data_t m_thread_data = {}; 285 286 // task queue 287 task_queue_t* m_task_queue = nullp 288 tbb_task_arena_t* m_tbb_task_arena = nullp 289 tbb_task_group_t* m_tbb_task_group = nullp 290 291 // functions 292 initialize_func_t m_init_func = initia 293 finalize_func_t m_fini_func = finali 294 affinity_func_t m_affinity_func = affini 295 296 private: 297 static size_type& f_default_pool_siz 298 static thread_id_map_t& f_thread_ids(); 299 }; 300 301 //-------------------------------------------- 302 inline void 303 ThreadPool::notify() 304 { 305 // wake up one thread that is waiting for 306 if(m_thread_awake->load() < m_pool_size) 307 { 308 AutoLock l(*m_task_lock); 309 m_task_cond->notify_one(); 310 } 311 } 312 //-------------------------------------------- 313 inline void 314 ThreadPool::notify_all() 315 { 316 // wake all threads 317 AutoLock l(*m_task_lock); 318 m_task_cond->notify_all(); 319 } 320 //-------------------------------------------- 321 inline void 322 ThreadPool::notify(size_type ntasks) 323 { 324 if(ntasks == 0) 325 return; 326 327 // wake up as many threads that tasks just 328 if(m_thread_awake->load() < m_pool_size) 329 { 330 AutoLock l(*m_task_lock); 331 if(ntasks < this->size()) 332 { 333 for(size_type i = 0; i < ntasks; + 334 m_task_cond->notify_one(); 335 } 336 else 337 { 338 m_task_cond->notify_all(); 339 } 340 } 341 } 342 //-------------------------------------------- 343 // local function for getting the tbb task sch 344 inline tbb_global_control_t*& 345 ThreadPool::tbb_global_control() 346 { 347 static thread_local tbb_global_control_t* 348 return _instance; 349 } 350 //-------------------------------------------- 351 // task arena 352 inline tbb_task_arena_t* 353 ThreadPool::get_task_arena() 354 { 355 #if defined(PTL_USE_TBB) 356 // create a task arena 357 if(!m_tbb_task_arena) 358 { 359 auto _sz = (tbb_global_control()) 360 ? tbb_global_control()- 361 tbb::global_contr 362 : size(); 363 m_tbb_task_arena = new tbb_task_arena_ 364 m_tbb_task_arena->initialize(_sz, 1); 365 } 366 #else 367 if(!m_tbb_task_arena) 368 m_tbb_task_arena = new tbb_task_arena_ 369 #endif 370 return m_tbb_task_arena; 371 } 372 //-------------------------------------------- 373 inline void 374 ThreadPool::resize(size_type _n) 375 { 376 initialize_threadpool(_n); 377 if(m_task_queue) 378 m_task_queue->resize(static_cast<intma 379 } 380 //-------------------------------------------- 381 inline int 382 ThreadPool::run_on_this(task_pointer&& _task) 383 { 384 auto&& _func = [_task]() { (*_task)(); }; 385 386 if(m_tbb_tp && m_tbb_task_group) 387 { 388 auto* _arena = get_task_arena(); 389 _arena->execute([this, _func]() { this 390 } 391 else 392 { 393 _func(); 394 } 395 // return the number of tasks added to tas 396 return 0; 397 } 398 //-------------------------------------------- 399 inline int 400 ThreadPool::insert(task_pointer&& task, int bi 401 { 402 static thread_local ThreadData* _data = Th 403 404 // pass the task to the queue 405 auto ibin = get_valid_queue(m_task_queue)- 406 notify(); 407 return (int)ibin; 408 } 409 //-------------------------------------------- 410 inline ThreadPool::size_type 411 ThreadPool::add_task(task_pointer&& task, int 412 { 413 // if not native (i.e. TBB) or we haven't 414 if(m_tbb_tp || !task->is_native_task() || 415 return static_cast<size_type>(run_on_t 416 417 return static_cast<size_type>(insert(std:: 418 } 419 //-------------------------------------------- 420 template <typename ListT> 421 inline ThreadPool::size_type 422 ThreadPool::add_tasks(ListT& c) 423 { 424 if(!m_alive_flag) // if we haven't built 425 { 426 for(auto& itr : c) 427 run(itr); 428 c.clear(); 429 return 0; 430 } 431 432 // TODO: put a limit on how many tasks can 433 auto c_size = c.size(); 434 for(auto& itr : c) 435 { 436 if(!itr->is_native_task()) 437 --c_size; 438 else 439 { 440 //++(m_task_queue); 441 get_valid_queue(m_task_queue)->Ins 442 } 443 } 444 c.clear(); 445 446 // notify sleeping threads 447 notify(c_size); 448 449 return c_size; 450 } 451 //-------------------------------------------- 452 template <typename FuncT> 453 inline void 454 ThreadPool::execute_on_all_threads(FuncT&& _fu 455 { 456 if(m_tbb_tp && m_tbb_task_group) 457 { 458 #if defined(PTL_USE_TBB) 459 // TBB lazily activates threads to pro 460 // participates in processing the task 461 // function to execute only on the wor 462 // 463 std::set<std::thread::id> _first{}; 464 Mutex _mutex{}; 465 // init function which executes functi 466 auto _init = [&]() { 467 int _once = 0; 468 _mutex.lock(); 469 if(_first.find(std::this_thread::g 470 { 471 // we need to reset this threa 472 // of the same template instan 473 _once = 1; 474 _first.insert(std::this_thread 475 } 476 _mutex.unlock(); 477 if(_once != 0) 478 { 479 _func(); 480 return 1; 481 } 482 return 0; 483 }; 484 // this will collect the number of thr 485 // executed the _init function above 486 std::atomic<size_t> _total_init{ 0 }; 487 // max parallelism by TBB 488 size_t _maxp = tbb_global_control()->a 489 tbb::global_control::max_allowed_p 490 // create a task arean 491 auto* _arena = get_task_arena(); 492 // size of the thread-pool 493 size_t _sz = size(); 494 // number of cores 495 size_t _ncore = GetNumberOfCores(); 496 // maximum depth for recursion 497 size_t _dmax = std::max<size_t>(_ncore 498 // how many threads we need to initial 499 size_t _num = std::min(_maxp, std::min 500 // this is the task passed to the task 501 std::function<void()> _init_task; 502 _init_task = [&]() { 503 add_thread_id(); 504 static thread_local size_type _dep 505 int _ret 506 // don't let the main thread execu 507 if(!is_main()) 508 { 509 // execute the function 510 _ret = _init(); 511 // add the result 512 _total_init += _ret; 513 } 514 // if the function did not return 515 // two more tasks 516 ++_depth; 517 if(_ret == 0 && _depth < _dmax && 518 { 519 tbb::task_group tg{}; 520 tg.run([&]() { _init_task(); } 521 tg.run([&]() { _init_task(); } 522 ThisThread::sleep_for(std::chr 523 tg.wait(); 524 } 525 --_depth; 526 }; 527 528 // TBB won't oversubscribe so we need 529 size_t nitr = 0; 530 auto _fname = __FUNCTION__; 531 auto _write_info = [&]() { 532 std::cout << "[" << _fname << "]> 533 << ", expected: " << _nu 534 << ", size: " << _sz << 535 }; 536 while(_total_init < _num) 537 { 538 auto _n = 2 * _num; 539 while(--_n > 0) 540 { 541 _arena->execute( 542 [&]() { m_tbb_task_group-> 543 } 544 _arena->execute([&]() { m_tbb_task 545 // don't loop infinitely but use a 546 if(nitr++ > 2 * (_num + 1) && (_to 547 { 548 _write_info(); 549 break; 550 } 551 // at this point we need to exit 552 if(nitr > 4 * (_ncore + 1)) 553 { 554 _write_info(); 555 break; 556 } 557 } 558 if(get_verbose() > 3) 559 _write_info(); 560 #endif 561 } 562 else if(get_queue()) 563 { 564 get_queue()->ExecuteOnAllThreads(this, 565 } 566 } 567 568 //-------------------------------------------- 569 570 template <typename FuncT> 571 inline void 572 ThreadPool::execute_on_specific_threads(const 573 FuncT& 574 { 575 if(m_tbb_tp && m_tbb_task_group) 576 { 577 #if defined(PTL_USE_TBB) 578 // TBB lazily activates threads to pro 579 // participates in processing the task 580 // function to execute only on the wor 581 // 582 std::set<std::thread::id> _first{}; 583 Mutex _mutex{}; 584 // init function which executes functi 585 auto _exec = [&]() { 586 int _once = 0; 587 _mutex.lock(); 588 if(_first.find(std::this_thread::g 589 { 590 // we need to reset this threa 591 // of the same template instan 592 _once = 1; 593 _first.insert(std::this_thread 594 } 595 _mutex.unlock(); 596 if(_once != 0) 597 { 598 _func(); 599 return 1; 600 } 601 return 0; 602 }; 603 // this will collect the number of thr 604 // executed the _exec function above 605 std::atomic<size_t> _total_exec{ 0 }; 606 // number of cores 607 size_t _ncore = GetNumberOfCores(); 608 // maximum depth for recursion 609 size_t _dmax = std::max<size_t>(_ncore 610 // how many threads we need to initial 611 size_t _num = _tids.size(); 612 // create a task arena 613 auto* _arena = get_task_arena(); 614 // this is the task passed to the task 615 std::function<void()> _exec_task; 616 _exec_task = [&]() { 617 add_thread_id(); 618 static thread_local size_type _dep 619 int _ret 620 auto _thi 621 // don't let the main thread execu 622 if(_tids.count(_this_tid) > 0) 623 { 624 // execute the function 625 _ret = _exec(); 626 // add the result 627 _total_exec += _ret; 628 } 629 // if the function did not return 630 // two more tasks 631 ++_depth; 632 if(_ret == 0 && _depth < _dmax && 633 { 634 tbb::task_group tg{}; 635 tg.run([&]() { _exec_task(); } 636 tg.run([&]() { _exec_task(); } 637 ThisThread::sleep_for(std::chr 638 tg.wait(); 639 } 640 --_depth; 641 }; 642 643 // TBB won't oversubscribe so we need 644 size_t nitr = 0; 645 auto _fname = __FUNCTION__; 646 auto _write_info = [&]() { 647 std::cout << "[" << _fname << "]> 648 << ", expected: " << _nu 649 }; 650 while(_total_exec < _num) 651 { 652 auto _n = 2 * _num; 653 while(--_n > 0) 654 { 655 _arena->execute( 656 [&]() { m_tbb_task_group-> 657 } 658 _arena->execute([&]() { m_tbb_task 659 // don't loop infinitely but use a 660 if(nitr++ > 2 * (_num + 1) && (_to 661 { 662 _write_info(); 663 break; 664 } 665 // at this point we need to exit 666 if(nitr > 8 * (_num + 1)) 667 { 668 _write_info(); 669 break; 670 } 671 } 672 if(get_verbose() > 3) 673 _write_info(); 674 #endif 675 } 676 else if(get_queue()) 677 { 678 get_queue()->ExecuteOnSpecificThreads( 679 } 680 } 681 682 //============================================ 683 684 } // namespace PTL 685