Geant4 Cross Reference |
1 // 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 19 // --------------------------------------------------------------- 20 // Tasking class header file 21 // 22 // Class Description: 23 // 24 // This file creates a class for an efficient thread-pool that 25 // accepts work in the form of tasks. 26 // 27 // --------------------------------------------------------------- 28 // Author: Jonathan Madsen (Feb 13th 2018) 29 // --------------------------------------------------------------- 30 31 #pragma once 32 33 #include "PTL/AutoLock.hh" 34 #ifndef G4GMAKE 35 #include "" 36 #endif 37 #include "PTL/ThreadData.hh" 38 #include "PTL/Threading.hh" 39 #include "PTL/Types.hh" 40 #include "PTL/VTask.hh" 41 #include "PTL/VUserTaskQueue.hh" 42 43 #if defined(PTL_USE_TBB) 44 # if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES) 45 # define TBB_SUPPRESS_DEPRECATED_MESSAGES 1 46 # endif 47 # if !defined(TBB_PREVIEW_GLOBAL_CONTROL) 48 # define TBB_PREVIEW_GLOBAL_CONTROL 1 49 # endif 50 # include <tbb/global_control.h> 51 # include <tbb/task_arena.h> 52 # include <tbb/task_group.h> 53 #endif 54 55 #include <algorithm> 56 #include <atomic> 57 #include <chrono> 58 #include <cstdint> 59 #include <cstdlib> 60 #include <deque> 61 #include <functional> 62 #include <iostream> 63 #include <map> 64 #include <memory> 65 #include <mutex> // IWYU pragma: keep 66 #include <set> 67 #include <thread> 68 #include <type_traits> // IWYU pragma: keep 69 #include <unordered_map> 70 #include <utility> 71 #include <vector> 72 73 namespace PTL 74 { 75 namespace thread_pool 76 { 77 namespace state 78 { 79 static const short STARTED = 0; 80 static const short PARTIAL = 1; 81 static const short STOPPED = 2; 82 static const short NONINIT = 3; 83 84 } // namespace state 85 } // namespace thread_pool 86 87 class ThreadPool 88 { 89 public: 90 template <typename KeyT, typename MappedT, typename HashT = KeyT> 91 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>; 92 93 // pod-types 94 using size_type = size_t; 95 using task_count_type = std::shared_ptr<std::atomic_uintmax_t>; 96 using atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>; 97 using pool_state_type = std::shared_ptr<std::atomic_short>; 98 using atomic_bool_type = std::shared_ptr<std::atomic_bool>; 99 // objects 100 using task_type = VTask; 101 using lock_t = std::shared_ptr<Mutex>; 102 using condition_t = std::shared_ptr<Condition>; 103 using task_pointer = std::shared_ptr<task_type>; 104 using task_queue_t = VUserTaskQueue; 105 // containers 106 using thread_list_t = std::deque<ThreadId>; 107 using bool_list_t = std::vector<bool>; 108 using thread_id_map_t = std::map<ThreadId, uintmax_t>; 109 using thread_index_map_t = std::map<uintmax_t, ThreadId>; 110 using thread_vec_t = std::vector<Thread>; 111 using thread_data_t = std::vector<std::shared_ptr<ThreadData>>; 112 // functions 113 using initialize_func_t = std::function<void()>; 114 using finalize_func_t = std::function<void()>; 115 using affinity_func_t = std::function<intmax_t(intmax_t)>; 116 117 static affinity_func_t& affinity_functor() 118 { 119 static affinity_func_t _v = [](intmax_t) { 120 static std::atomic<intmax_t> assigned; 121 intmax_t _assign = assigned++; 122 return _assign % Thread::hardware_concurrency(); 123 }; 124 return _v; 125 } 126 127 static initialize_func_t& initialization_functor() 128 { 129 static initialize_func_t _v = []() {}; 130 return _v; 131 } 132 133 static finalize_func_t& finalization_functor() 134 { 135 static finalize_func_t _v = []() {}; 136 return _v; 137 } 138 139 struct Config 140 { 141 bool init = true; 142 bool use_tbb = false; 143 bool use_affinity = false; 144 int verbose = 0; 145 int priority = 0; 146 size_type pool_size = f_default_pool_size(); 147 VUserTaskQueue* task_queue = nullptr; 148 affinity_func_t set_affinity = affinity_functor(); 149 initialize_func_t initializer = initialization_functor(); 150 finalize_func_t finalizer = finalization_functor(); 151 }; 152 153 public: 154 // Constructor and Destructors 155 explicit ThreadPool(const Config&); 156 ~ThreadPool(); 157 ThreadPool(const ThreadPool&) = delete; 158 ThreadPool(ThreadPool&&) = default; 159 ThreadPool& operator=(const ThreadPool&) = delete; 160 ThreadPool& operator=(ThreadPool&&) = default; 161 162 public: 163 // Public functions 164 size_type initialize_threadpool(size_type); // start the threads 165 size_type destroy_threadpool(); // destroy the threads 166 size_type stop_thread(); 167 168 template <typename FuncT> 169 void execute_on_all_threads(FuncT&& _func); 170 171 template <typename FuncT> 172 void execute_on_specific_threads(const std::set<std::thread::id>& _tid, 173 FuncT&& _func); 174 175 task_queue_t* get_queue() const { return m_task_queue; } 176 task_queue_t*& get_valid_queue(task_queue_t*&) const; 177 178 bool is_tbb_threadpool() const { return m_tbb_tp; } 179 180 public: 181 /// set the default pool size 182 static void set_default_size(size_type _v) { f_default_pool_size() = _v; } 183 184 /// get the default pool size 185 static size_type get_default_size() { return f_default_pool_size(); } 186 187 public: 188 // add tasks for threads to process 189 size_type add_task(task_pointer&& task, int bin = -1); 190 // size_type add_thread_task(ThreadId id, task_pointer&& task); 191 // add a generic container with iterator 192 template <typename ListT> 193 size_type add_tasks(ListT&); 194 195 Thread* get_thread(size_type _n) const; 196 Thread* get_thread(std::thread::id id) const; 197 198 // only relevant when compiled with PTL_USE_TBB 199 static tbb_global_control_t*& tbb_global_control(); 200 201 void set_initialization(initialize_func_t f) { m_init_func = std::move(f); } 202 void set_finalization(finalize_func_t f) { m_fini_func = std::move(f); } 203 204 void reset_initialization() 205 { 206 m_init_func = []() {}; 207 } 208 void reset_finalization() 209 { 210 m_fini_func = []() {}; 211 } 212 213 public: 214 // get the pool state 215 const pool_state_type& state() const { return m_pool_state; } 216 // see how many main task threads there are 217 size_type size() const { return m_pool_size; } 218 // set the thread pool size 219 void resize(size_type _n); 220 // affinity assigns threads to cores, assignment at constructor 221 bool using_affinity() const { return m_use_affinity; } 222 bool is_alive() { return m_alive_flag->load(); } 223 void notify(); 224 void notify_all(); 225 void notify(size_type); 226 bool is_initialized() const; 227 int get_active_threads_count() const { return (int)m_thread_awake->load(); } 228 229 void set_affinity(affinity_func_t f) { m_affinity_func = std::move(f); } 230 void set_affinity(intmax_t i, Thread&) const; 231 void set_priority(int _prio, Thread&) const; 232 233 void set_verbose(int n) { m_verbose = n; } 234 int get_verbose() const { return m_verbose; } 235 bool is_main() const { return ThisThread::get_id() == m_main_tid; } 236 237 tbb_task_arena_t* get_task_arena(); 238 239 public: 240 // read FORCE_NUM_THREADS environment variable 241 static const thread_id_map_t& get_thread_ids(); 242 static uintmax_t get_thread_id(ThreadId); 243 static uintmax_t get_this_thread_id(); 244 static uintmax_t add_thread_id(ThreadId = ThisThread::get_id()); 245 246 private: 247 void execute_thread(VUserTaskQueue*); // function thread sits in 248 int insert(task_pointer&&, int = -1); 249 int run_on_this(task_pointer&&); 250 251 private: 252 // called in THREAD INIT 253 static void start_thread(ThreadPool*, thread_data_t*, intmax_t = -1); 254 255 void record_entry(); 256 void record_exit(); 257 258 private: 259 // Private variables 260 // random 261 bool m_use_affinity = false; 262 bool m_tbb_tp = false; 263 bool m_delete_task_queue = false; 264 int m_verbose = 0; 265 int m_priority = 0; 266 size_type m_pool_size = 0; 267 ThreadId m_main_tid = ThisThread::get_id(); 268 atomic_bool_type m_alive_flag = std::make_shared<std::atomic_bool>(false); 269 pool_state_type m_pool_state = std::make_shared<std::atomic_short>(0); 270 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>(0); 271 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>(0); 272 273 // locks 274 lock_t m_task_lock = std::make_shared<Mutex>(); 275 // conditions 276 condition_t m_task_cond = std::make_shared<Condition>(); 277 278 // containers 279 bool_list_t m_is_joined = {}; // join list 280 bool_list_t m_is_stopped = {}; // lets thread know to stop 281 thread_list_t m_main_threads = {}; // storage for active threads 282 thread_list_t m_stop_threads = {}; // storage for stopped threads 283 thread_vec_t m_threads = {}; 284 thread_data_t m_thread_data = {}; 285 286 // task queue 287 task_queue_t* m_task_queue = nullptr; 288 tbb_task_arena_t* m_tbb_task_arena = nullptr; 289 tbb_task_group_t* m_tbb_task_group = nullptr; 290 291 // functions 292 initialize_func_t m_init_func = initialization_functor(); 293 finalize_func_t m_fini_func = finalization_functor(); 294 affinity_func_t m_affinity_func = affinity_functor(); 295 296 private: 297 static size_type& f_default_pool_size(); 298 static thread_id_map_t& f_thread_ids(); 299 }; 300 301 //--------------------------------------------------------------------------------------// 302 inline void 303 ThreadPool::notify() 304 { 305 // wake up one thread that is waiting for a task to be available 306 if(m_thread_awake->load() < m_pool_size) 307 { 308 AutoLock l(*m_task_lock); 309 m_task_cond->notify_one(); 310 } 311 } 312 //--------------------------------------------------------------------------------------// 313 inline void 314 ThreadPool::notify_all() 315 { 316 // wake all threads 317 AutoLock l(*m_task_lock); 318 m_task_cond->notify_all(); 319 } 320 //--------------------------------------------------------------------------------------// 321 inline void 322 ThreadPool::notify(size_type ntasks) 323 { 324 if(ntasks == 0) 325 return; 326 327 // wake up as many threads that tasks just added 328 if(m_thread_awake->load() < m_pool_size) 329 { 330 AutoLock l(*m_task_lock); 331 if(ntasks < this->size()) 332 { 333 for(size_type i = 0; i < ntasks; ++i) 334 m_task_cond->notify_one(); 335 } 336 else 337 { 338 m_task_cond->notify_all(); 339 } 340 } 341 } 342 //--------------------------------------------------------------------------------------// 343 // local function for getting the tbb task scheduler 344 inline tbb_global_control_t*& 345 ThreadPool::tbb_global_control() 346 { 347 static thread_local tbb_global_control_t* _instance = nullptr; 348 return _instance; 349 } 350 //--------------------------------------------------------------------------------------// 351 // task arena 352 inline tbb_task_arena_t* 353 ThreadPool::get_task_arena() 354 { 355 #if defined(PTL_USE_TBB) 356 // create a task arena 357 if(!m_tbb_task_arena) 358 { 359 auto _sz = (tbb_global_control()) 360 ? tbb_global_control()->active_value( 361 tbb::global_control::max_allowed_parallelism) 362 : size(); 363 m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{}); 364 m_tbb_task_arena->initialize(_sz, 1); 365 } 366 #else 367 if(!m_tbb_task_arena) 368 m_tbb_task_arena = new tbb_task_arena_t{}; 369 #endif 370 return m_tbb_task_arena; 371 } 372 //--------------------------------------------------------------------------------------// 373 inline void 374 ThreadPool::resize(size_type _n) 375 { 376 initialize_threadpool(_n); 377 if(m_task_queue) 378 m_task_queue->resize(static_cast<intmax_t>(_n)); 379 } 380 //--------------------------------------------------------------------------------------// 381 inline int 382 ThreadPool::run_on_this(task_pointer&& _task) 383 { 384 auto&& _func = [_task]() { (*_task)(); }; 385 386 if(m_tbb_tp && m_tbb_task_group) 387 { 388 auto* _arena = get_task_arena(); 389 _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); }); 390 } 391 else 392 { 393 _func(); 394 } 395 // return the number of tasks added to task-list 396 return 0; 397 } 398 //--------------------------------------------------------------------------------------// 399 inline int 400 ThreadPool::insert(task_pointer&& task, int bin) 401 { 402 static thread_local ThreadData* _data = ThreadData::GetInstance(); 403 404 // pass the task to the queue 405 auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin); 406 notify(); 407 return (int)ibin; 408 } 409 //--------------------------------------------------------------------------------------// 410 inline ThreadPool::size_type 411 ThreadPool::add_task(task_pointer&& task, int bin) 412 { 413 // if not native (i.e. TBB) or we haven't built thread-pool, just execute 414 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load()) 415 return static_cast<size_type>(run_on_this(std::move(task))); 416 417 return static_cast<size_type>(insert(std::move(task), bin)); 418 } 419 //--------------------------------------------------------------------------------------// 420 template <typename ListT> 421 inline ThreadPool::size_type 422 ThreadPool::add_tasks(ListT& c) 423 { 424 if(!m_alive_flag) // if we haven't built thread-pool, just execute 425 { 426 for(auto& itr : c) 427 run(itr); 428 c.clear(); 429 return 0; 430 } 431 432 // TODO: put a limit on how many tasks can be added at most 433 auto c_size = c.size(); 434 for(auto& itr : c) 435 { 436 if(!itr->is_native_task()) 437 --c_size; 438 else 439 { 440 //++(m_task_queue); 441 get_valid_queue(m_task_queue)->InsertTask(itr); 442 } 443 } 444 c.clear(); 445 446 // notify sleeping threads 447 notify(c_size); 448 449 return c_size; 450 } 451 //--------------------------------------------------------------------------------------// 452 template <typename FuncT> 453 inline void 454 ThreadPool::execute_on_all_threads(FuncT&& _func) 455 { 456 if(m_tbb_tp && m_tbb_task_group) 457 { 458 #if defined(PTL_USE_TBB) 459 // TBB lazily activates threads to process tasks and the main thread 460 // participates in processing the tasks so getting a specific 461 // function to execute only on the worker threads requires some trickery 462 // 463 std::set<std::thread::id> _first{}; 464 Mutex _mutex{}; 465 // init function which executes function and returns 1 only once 466 auto _init = [&]() { 467 int _once = 0; 468 _mutex.lock(); 469 if(_first.find(std::this_thread::get_id()) == _first.end()) 470 { 471 // we need to reset this thread-local static for multiple invocations 472 // of the same template instantiation 473 _once = 1; 474 _first.insert(std::this_thread::get_id()); 475 } 476 _mutex.unlock(); 477 if(_once != 0) 478 { 479 _func(); 480 return 1; 481 } 482 return 0; 483 }; 484 // this will collect the number of threads which have 485 // executed the _init function above 486 std::atomic<size_t> _total_init{ 0 }; 487 // max parallelism by TBB 488 size_t _maxp = tbb_global_control()->active_value( 489 tbb::global_control::max_allowed_parallelism); 490 // create a task arean 491 auto* _arena = get_task_arena(); 492 // size of the thread-pool 493 size_t _sz = size(); 494 // number of cores 495 size_t _ncore = GetNumberOfCores(); 496 // maximum depth for recursion 497 size_t _dmax = std::max<size_t>(_ncore, 8); 498 // how many threads we need to initialize 499 size_t _num = std::min(_maxp, std::min(_sz, _ncore)); 500 // this is the task passed to the task-group 501 std::function<void()> _init_task; 502 _init_task = [&]() { 503 add_thread_id(); 504 static thread_local size_type _depth = 0; 505 int _ret = 0; 506 // don't let the main thread execute the function 507 if(!is_main()) 508 { 509 // execute the function 510 _ret = _init(); 511 // add the result 512 _total_init += _ret; 513 } 514 // if the function did not return anything, recursively execute 515 // two more tasks 516 ++_depth; 517 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num) 518 { 519 tbb::task_group tg{}; 520 tg.run([&]() { _init_task(); }); 521 tg.run([&]() { _init_task(); }); 522 ThisThread::sleep_for(std::chrono::milliseconds{ 1 }); 523 tg.wait(); 524 } 525 --_depth; 526 }; 527 528 // TBB won't oversubscribe so we need to limit by ncores - 1 529 size_t nitr = 0; 530 auto _fname = __FUNCTION__; 531 auto _write_info = [&]() { 532 std::cout << "[" << _fname << "]> Total initialized: " << _total_init 533 << ", expected: " << _num << ", max-parallel: " << _maxp 534 << ", size: " << _sz << ", ncore: " << _ncore << std::endl; 535 }; 536 while(_total_init < _num) 537 { 538 auto _n = 2 * _num; 539 while(--_n > 0) 540 { 541 _arena->execute( 542 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); }); 543 } 544 _arena->execute([&]() { m_tbb_task_group->wait(); }); 545 // don't loop infinitely but use a strict condition 546 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num) 547 { 548 _write_info(); 549 break; 550 } 551 // at this point we need to exit 552 if(nitr > 4 * (_ncore + 1)) 553 { 554 _write_info(); 555 break; 556 } 557 } 558 if(get_verbose() > 3) 559 _write_info(); 560 #endif 561 } 562 else if(get_queue()) 563 { 564 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func)); 565 } 566 } 567 568 //--------------------------------------------------------------------------------------// 569 570 template <typename FuncT> 571 inline void 572 ThreadPool::execute_on_specific_threads(const std::set<std::thread::id>& _tids, 573 FuncT&& _func) 574 { 575 if(m_tbb_tp && m_tbb_task_group) 576 { 577 #if defined(PTL_USE_TBB) 578 // TBB lazily activates threads to process tasks and the main thread 579 // participates in processing the tasks so getting a specific 580 // function to execute only on the worker threads requires some trickery 581 // 582 std::set<std::thread::id> _first{}; 583 Mutex _mutex{}; 584 // init function which executes function and returns 1 only once 585 auto _exec = [&]() { 586 int _once = 0; 587 _mutex.lock(); 588 if(_first.find(std::this_thread::get_id()) == _first.end()) 589 { 590 // we need to reset this thread-local static for multiple invocations 591 // of the same template instantiation 592 _once = 1; 593 _first.insert(std::this_thread::get_id()); 594 } 595 _mutex.unlock(); 596 if(_once != 0) 597 { 598 _func(); 599 return 1; 600 } 601 return 0; 602 }; 603 // this will collect the number of threads which have 604 // executed the _exec function above 605 std::atomic<size_t> _total_exec{ 0 }; 606 // number of cores 607 size_t _ncore = GetNumberOfCores(); 608 // maximum depth for recursion 609 size_t _dmax = std::max<size_t>(_ncore, 8); 610 // how many threads we need to initialize 611 size_t _num = _tids.size(); 612 // create a task arena 613 auto* _arena = get_task_arena(); 614 // this is the task passed to the task-group 615 std::function<void()> _exec_task; 616 _exec_task = [&]() { 617 add_thread_id(); 618 static thread_local size_type _depth = 0; 619 int _ret = 0; 620 auto _this_tid = std::this_thread::get_id(); 621 // don't let the main thread execute the function 622 if(_tids.count(_this_tid) > 0) 623 { 624 // execute the function 625 _ret = _exec(); 626 // add the result 627 _total_exec += _ret; 628 } 629 // if the function did not return anything, recursively execute 630 // two more tasks 631 ++_depth; 632 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num) 633 { 634 tbb::task_group tg{}; 635 tg.run([&]() { _exec_task(); }); 636 tg.run([&]() { _exec_task(); }); 637 ThisThread::sleep_for(std::chrono::milliseconds{ 1 }); 638 tg.wait(); 639 } 640 --_depth; 641 }; 642 643 // TBB won't oversubscribe so we need to limit by ncores - 1 644 size_t nitr = 0; 645 auto _fname = __FUNCTION__; 646 auto _write_info = [&]() { 647 std::cout << "[" << _fname << "]> Total executed: " << _total_exec 648 << ", expected: " << _num << ", size: " << size() << std::endl; 649 }; 650 while(_total_exec < _num) 651 { 652 auto _n = 2 * _num; 653 while(--_n > 0) 654 { 655 _arena->execute( 656 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); }); 657 } 658 _arena->execute([&]() { m_tbb_task_group->wait(); }); 659 // don't loop infinitely but use a strict condition 660 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num) 661 { 662 _write_info(); 663 break; 664 } 665 // at this point we need to exit 666 if(nitr > 8 * (_num + 1)) 667 { 668 _write_info(); 669 break; 670 } 671 } 672 if(get_verbose() > 3) 673 _write_info(); 674 #endif 675 } 676 else if(get_queue()) 677 { 678 get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func)); 679 } 680 } 681 682 //======================================================================================// 683 684 } // namespace PTL 685