Geant4 Cross Reference |
1 // 1 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 5 // of this software and associated documentati 6 // in the Software without restriction, includ 7 // to use, copy, modify, merge, publish, distr 8 // copies of the Software, and to permit perso 9 // furnished to do so, subject to the followin 10 // The above copyright notice and this permiss 11 // all copies or substantial portions of the S 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 18 // 19 // ------------------------------------------- 20 // Tasking class implementation 21 // 22 // Class Description: 23 // 24 // This file creates a class for an efficient 25 // accepts work in the form of tasks. 26 // 27 // ------------------------------------------- 28 // Author: Jonathan Madsen (Feb 13th 2018) 29 // ------------------------------------------- 30 31 #include "PTL/ThreadPool.hh" 32 #include "PTL/GetEnv.hh" 33 #include "PTL/ScopeDestructor.hh" 34 #include "PTL/ThreadData.hh" 35 #include "PTL/Threading.hh" 36 #include "PTL/UserTaskQueue.hh" 37 #include "PTL/VUserTaskQueue.hh" 38 39 #include <cassert> 40 #include <mutex> 41 #include <new> 42 #include <stdexcept> 43 #include <thread> 44 45 //============================================ 46 47 namespace 48 { 49 PTL::ThreadData*& 50 thread_data() 51 { 52 return PTL::ThreadData::GetInstance(); 53 } 54 } // namespace 55 56 namespace PTL 57 { 58 //============================================ 59 60 ThreadPool::thread_id_map_t& 61 ThreadPool::f_thread_ids() 62 { 63 static auto _v = thread_id_map_t{}; 64 return _v; 65 } 66 67 //============================================ 68 69 ThreadPool::size_type& 70 ThreadPool::f_default_pool_size() 71 { 72 static size_type _v = 73 GetEnv<size_type>("PTL_NUM_THREADS", T 74 return _v; 75 } 76 77 //============================================ 78 // static member function that calls the membe 79 // run 80 void 81 ThreadPool::start_thread(ThreadPool* tp, threa 82 { 83 if(tp->get_verbose() > 0) 84 { 85 AutoLock lock(TypeMutex<decltype(std:: 86 std::cerr << "[PTL::ThreadPool] Starti 87 } 88 89 auto _thr_data = std::make_shared<ThreadDa 90 { 91 AutoLock lock(TypeMutex<ThreadPool>(), 92 if(!lock.owns_lock()) 93 lock.lock(); 94 if(_idx < 0) 95 _idx = f_thread_ids().size(); 96 f_thread_ids()[std::this_thread::get_i 97 SetThreadId((int)_idx); 98 _data->emplace_back(_thr_data); 99 } 100 thread_data() = _thr_data.get(); 101 tp->record_entry(); 102 tp->execute_thread(thread_data()->current_ 103 tp->record_exit(); 104 105 if(tp->get_verbose() > 0) 106 { 107 AutoLock lock(TypeMutex<decltype(std:: 108 std::cerr << "[PTL::ThreadPool] Thread 109 << std::endl; 110 } 111 } 112 113 //============================================ 114 115 const ThreadPool::thread_id_map_t& 116 ThreadPool::get_thread_ids() 117 { 118 return f_thread_ids(); 119 } 120 121 //============================================ 122 123 uintmax_t 124 ThreadPool::get_thread_id(ThreadId _tid) 125 { 126 uintmax_t _idx = 0; 127 { 128 AutoLock lock(TypeMutex<ThreadPool>(), 129 if(!lock.owns_lock()) 130 lock.lock(); 131 auto itr = f_thread_ids().find(_tid); 132 if(itr == f_thread_ids().end()) 133 { 134 _idx = f_thread_id 135 f_thread_ids()[_tid] = _idx; 136 } 137 else 138 { 139 _idx = itr->second; 140 } 141 } 142 return _idx; 143 } 144 145 //============================================ 146 147 uintmax_t 148 ThreadPool::get_this_thread_id() 149 { 150 return get_thread_id(ThisThread::get_id()) 151 } 152 153 //============================================ 154 155 uintmax_t 156 ThreadPool::add_thread_id(ThreadId _tid) 157 { 158 AutoLock lock(TypeMutex<ThreadPool>(), std 159 if(!lock.owns_lock()) 160 lock.lock(); 161 if(f_thread_ids().find(_tid) == f_thread_i 162 { 163 auto _idx = f_thread_ids(). 164 f_thread_ids()[_tid] = _idx; 165 SetThreadId((int)_idx); 166 } 167 return f_thread_ids().at(_tid); 168 } 169 170 //============================================ 171 172 ThreadPool::ThreadPool(const Config& _cfg) 173 : m_use_affinity{ _cfg.use_affinity } 174 , m_tbb_tp{ _cfg.use_tbb } 175 , m_verbose{ _cfg.verbose } 176 , m_priority{ _cfg.priority } 177 , m_pool_state{ std::make_shared<std::atomic_s 178 , m_task_queue{ _cfg.task_queue } 179 , m_init_func{ _cfg.initializer } 180 , m_fini_func{ _cfg.finalizer } 181 , m_affinity_func{ _cfg.set_affinity } 182 { 183 auto master_id = get_this_thread_id(); 184 if(master_id != 0 && m_verbose > 1) 185 { 186 AutoLock lock(TypeMutex<decltype(std:: 187 std::cerr << "[PTL::ThreadPool] Thread 188 } 189 190 thread_data() = new ThreadData(this); 191 192 // initialize after get_this_thread_id so 193 if(_cfg.init) 194 this->initialize_threadpool(_cfg.pool_ 195 } 196 197 //============================================ 198 199 ThreadPool::~ThreadPool() 200 { 201 if(m_alive_flag->load()) 202 { 203 std::cerr << "Warning! ThreadPool was 204 "destroy_threadpool() bef 205 "eliminate this message." 206 << std::endl; 207 m_pool_state->store(thread_pool::state 208 m_task_lock->lock(); 209 m_task_cond->notify_all(); 210 m_task_lock->unlock(); 211 for(auto& itr : m_threads) 212 itr.join(); 213 m_threads.clear(); 214 } 215 216 // delete owned resources 217 if(m_delete_task_queue) 218 delete m_task_queue; 219 220 delete m_tbb_task_arena; 221 delete m_tbb_task_group; 222 } 223 224 //============================================ 225 226 bool 227 ThreadPool::is_initialized() const 228 { 229 return !(m_pool_state->load() == thread_po 230 } 231 232 //============================================ 233 234 void 235 ThreadPool::record_entry() 236 { 237 ++(*m_thread_active); 238 } 239 240 //============================================ 241 242 void 243 ThreadPool::record_exit() 244 { 245 --(*m_thread_active); 246 } 247 248 //============================================ 249 250 void 251 ThreadPool::set_affinity(intmax_t i, Thread& _ 252 { 253 try 254 { 255 NativeThread native_thread = _thread.n 256 intmax_t _pin = m_affinit 257 if(m_verbose > 0) 258 { 259 AutoLock lock(TypeMutex<decltype(s 260 std::cerr << "[PTL::ThreadPool] Se 261 << get_thread_id(_thread 262 } 263 SetPinAffinity((int)_pin, native_threa 264 } catch(std::runtime_error& e) 265 { 266 std::cerr << "[PTL::ThreadPool] Error 267 << std::endl; 268 } 269 } 270 271 //============================================ 272 273 void 274 ThreadPool::set_priority(int _prio, Thread& _t 275 { 276 try 277 { 278 NativeThread native_thread = _thread.n 279 if(m_verbose > 0) 280 { 281 AutoLock lock(TypeMutex<decltype(s 282 std::cerr << "[PTL::ThreadPool] Se 283 << get_thread_id(_thread 284 << std::endl; 285 } 286 SetThreadPriority(_prio, native_thread 287 } catch(std::runtime_error& e) 288 { 289 AutoLock lock(TypeMutex<decltype(std:: 290 std::cerr << "[PTL::ThreadPool] Error 291 << std::endl; 292 } 293 } 294 295 //============================================ 296 297 ThreadPool::size_type 298 ThreadPool::initialize_threadpool(size_type pr 299 { 300 //---------------------------------------- 301 // return before initializing 302 if(proposed_size < 1) 303 return 0; 304 305 //---------------------------------------- 306 // store that has been started 307 if(!m_alive_flag->load()) 308 m_pool_state->store(thread_pool::state 309 310 #if defined(PTL_USE_TBB) 311 //---------------------------------------- 312 // handle tbb task scheduler 313 if(m_tbb_tp) 314 { 315 m_tbb_tp 316 m_pool_size 317 tbb_global_control_t*& _global_control 318 // delete if wrong size 319 if(m_pool_size != proposed_size) 320 { 321 delete _global_control; 322 _global_control = nullptr; 323 } 324 325 if(!_global_control) 326 { 327 _global_control = new tbb_global_c 328 tbb::global_control::max_allow 329 if(m_verbose > 0) 330 { 331 AutoLock lock(TypeMutex<declty 332 std::cerr << "[PTL::ThreadPool 333 << m_pool_size << " 334 } 335 } 336 337 // create task group (used for async) 338 if(!m_tbb_task_group) 339 { 340 m_tbb_task_group = new tbb_task_gr 341 execute_on_all_threads([this]() { 342 } 343 344 return m_pool_size; 345 } 346 #endif 347 348 m_alive_flag->store(true); 349 350 //---------------------------------------- 351 // if started, stop some thread if smaller 352 if(m_pool_state->load() == thread_pool::st 353 { 354 if(m_pool_size > proposed_size) 355 { 356 while(stop_thread() > proposed_siz 357 ; 358 if(m_verbose > 0) 359 { 360 AutoLock lock(TypeMutex<declty 361 std::cerr << "[PTL::ThreadPool 362 << m_pool_size << " 363 } 364 if(!m_task_queue) 365 { 366 m_delete_task_queue = true; 367 m_task_queue = new User 368 } 369 else 370 { 371 m_task_queue->resize(m_pool_si 372 } 373 return m_pool_size; 374 } 375 else if(m_pool_size == proposed_size) 376 { 377 if(m_verbose > 0) 378 { 379 AutoLock lock(TypeMutex<declty 380 std::cerr << "ThreadPool initi 381 << std::endl; 382 } 383 if(!m_task_queue) 384 { 385 m_delete_task_queue = true; 386 m_task_queue = new User 387 } 388 return m_pool_size; 389 } 390 } 391 392 //---------------------------------------- 393 // reserve enough space to prevent realloc 394 { 395 AutoLock _task_lock(*m_task_lock); 396 m_is_joined.reserve(proposed_size); 397 } 398 399 if(!m_task_queue) 400 { 401 m_delete_task_queue = true; 402 m_task_queue = new UserTaskQueu 403 } 404 405 auto this_tid = get_this_thread_id(); 406 for(size_type i = m_pool_size; i < propose 407 { 408 // add the threads 409 try 410 { 411 // create thread 412 Thread thr{ ThreadPool::start_thre 413 this_tid + i + 1 }; 414 // only reaches here if successful 415 ++m_pool_size; 416 // store thread 417 m_main_threads.push_back(thr.get_i 418 // list of joined thread booleans 419 m_is_joined.push_back(false); 420 // set the affinity 421 if(m_use_affinity) 422 set_affinity(i, thr); 423 set_priority(m_priority, thr); 424 // store 425 m_threads.emplace_back(std::move(t 426 } catch(std::runtime_error& e) 427 { 428 AutoLock lock(TypeMutex<decltype(s 429 std::cerr << "[PTL::ThreadPool] " 430 << std::endl; // issue 431 continue; 432 } catch(std::bad_alloc& e) 433 { 434 AutoLock lock(TypeMutex<decltype(s 435 std::cerr << "[PTL::ThreadPool] " 436 continue; 437 } 438 } 439 //---------------------------------------- 440 441 AutoLock _task_lock(*m_task_lock); 442 443 // thread pool size doesn't match with joi 444 // this will screw up joining later 445 if(m_is_joined.size() != m_main_threads.si 446 { 447 std::stringstream ss; 448 ss << "ThreadPool::initialize_threadpo 449 << "is a different size than thread 450 << m_main_threads.size() << " (tid: 451 452 throw std::runtime_error(ss.str()); 453 } 454 455 if(m_verbose > 0) 456 { 457 AutoLock lock(TypeMutex<decltype(std:: 458 std::cerr << "[PTL::ThreadPool] Thread 459 << " threads." << std::endl; 460 } 461 462 return m_main_threads.size(); 463 } 464 465 //============================================ 466 467 ThreadPool::size_type 468 ThreadPool::destroy_threadpool() 469 { 470 // Note: this is not for synchronization, 471 // destroy_threadpool() will only be calle 472 // the modified m_pool_state may not show 473 // modified in a lock! 474 //---------------------------------------- 475 m_pool_state->store(thread_pool::state::ST 476 477 //---------------------------------------- 478 // handle tbb task scheduler 479 #if defined(PTL_USE_TBB) 480 if(m_tbb_task_group) 481 { 482 execute_on_all_threads([this]() { m_fi 483 auto _func = [&]() { m_tbb_task_group- 484 if(m_tbb_task_arena) 485 m_tbb_task_arena->execute(_func); 486 else 487 _func(); 488 delete m_tbb_task_group; 489 m_tbb_task_group = nullptr; 490 } 491 if(m_tbb_task_arena) 492 { 493 delete m_tbb_task_arena; 494 m_tbb_task_arena = nullptr; 495 } 496 if(m_tbb_tp && tbb_global_control()) 497 { 498 tbb_global_control_t*& _global_control 499 delete _global_control; 500 _global_control = nullptr; 501 m_tbb_tp = false; 502 AutoLock lock(TypeMutex<decltype(std:: 503 if(m_verbose > 0) 504 { 505 std::cerr << "[PTL::ThreadPool] Th 506 } 507 } 508 #endif 509 510 if(!m_alive_flag->load()) 511 return 0; 512 513 //---------------------------------------- 514 // notify all threads we are shutting down 515 m_task_lock->lock(); 516 m_task_cond->notify_all(); 517 m_task_lock->unlock(); 518 //---------------------------------------- 519 520 if(m_is_joined.size() != m_main_threads.si 521 { 522 std::stringstream ss; 523 ss << " ThreadPool::destroy_thread_p 524 << "is a different size than thread 525 << m_main_threads.size() << " (tid: 526 527 throw std::runtime_error(ss.str()); 528 } 529 530 for(size_type i = 0; i < m_is_joined.size( 531 { 532 //------------------------------------ 533 // 534 if(i < m_threads.size()) 535 m_threads.at(i).join(); 536 537 //------------------------------------ 538 // if its joined already, nothing else 539 if(m_is_joined.at(i)) 540 continue; 541 542 //------------------------------------ 543 // join 544 if(std::this_thread::get_id() == m_mai 545 continue; 546 547 //------------------------------------ 548 // thread id and index 549 auto _tid = m_main_threads[i]; 550 551 //------------------------------------ 552 // erase thread from thread ID list 553 if(f_thread_ids().find(_tid) != f_thre 554 f_thread_ids().erase(f_thread_ids( 555 556 //------------------------------------ 557 // it's joined 558 m_is_joined.at(i) = true; 559 } 560 561 m_thread_data.clear(); 562 m_threads.clear(); 563 m_main_threads.clear(); 564 m_is_joined.clear(); 565 566 m_alive_flag->store(false); 567 568 auto start = std::chrono::steady_clock:: 569 auto elapsed = std::chrono::duration<doubl 570 // wait maximum of 30 seconds for threads 571 while(m_thread_active->load() > 0 && elaps 572 { 573 std::this_thread::sleep_for(std::chron 574 elapsed = std::chrono::steady_clock::n 575 } 576 577 auto _active = m_thread_active->load(); 578 579 if(get_verbose() > 0) 580 { 581 if(_active == 0) 582 { 583 AutoLock lock(TypeMutex<decltype(s 584 std::cerr << "[PTL::ThreadPool] Th 585 } 586 else 587 { 588 AutoLock lock(TypeMutex<decltype(s 589 std::cerr << "[PTL::ThreadPool] Th 590 << " threads might still 591 << std::endl; 592 } 593 } 594 595 if(m_delete_task_queue) 596 { 597 delete m_task_queue; 598 m_task_queue = nullptr; 599 } 600 601 return 0; 602 } 603 604 //============================================ 605 606 ThreadPool::size_type 607 ThreadPool::stop_thread() 608 { 609 if(!m_alive_flag->load() || m_pool_size == 610 return 0; 611 612 m_pool_state->store(thread_pool::state::PA 613 614 //---------------------------------------- 615 // notify all threads we are shutting down 616 m_task_lock->lock(); 617 m_is_stopped.push_back(true); 618 m_task_cond->notify_one(); 619 m_task_lock->unlock(); 620 //---------------------------------------- 621 622 while(!m_is_stopped.empty() && m_stop_thre 623 ; 624 625 // lock up the task queue 626 AutoLock _task_lock(*m_task_lock); 627 628 while(!m_stop_threads.empty()) 629 { 630 auto tid = m_stop_threads.front(); 631 // remove from stopped 632 m_stop_threads.pop_front(); 633 // remove from main 634 for(auto itr = m_main_threads.begin(); 635 { 636 if(*itr == tid) 637 { 638 m_main_threads.erase(itr); 639 break; 640 } 641 } 642 // remove from join list 643 m_is_joined.pop_back(); 644 } 645 646 m_pool_state->store(thread_pool::state::ST 647 648 m_pool_size = m_main_threads.size(); 649 return m_main_threads.size(); 650 } 651 652 //============================================ 653 654 ThreadPool::task_queue_t*& 655 ThreadPool::get_valid_queue(task_queue_t*& _qu 656 { 657 if(!_queue) 658 _queue = new UserTaskQueue{ static_cas 659 return _queue; 660 } 661 //============================================ 662 663 // Temporary workaround for shared_ptr constru 664 #if defined (__APPLE__) && defined(__amd64) && 665 [[clang::optnone]] 666 #endif 667 void 668 ThreadPool::execute_thread(VUserTaskQueue* _ta 669 { 670 ++(*m_thread_awake); 671 672 // initialization function 673 m_init_func(); 674 // finalization function (executed when sc 675 ScopeDestructor _fini{ [this]() { m_fini_f 676 677 ThreadId tid = ThisThread::get_id(); 678 ThreadData* data = thread_data(); 679 // auto thread_bin = _task_queue->G 680 // auto workers = _task_queue->w 681 682 auto start = std::chrono::steady_clock:: 683 auto elapsed = std::chrono::duration<doubl 684 // check for updates for 60 seconds max 685 while(!_task_queue && elapsed.count() < 60 686 { 687 elapsed = std::chrono::steady_clock::n 688 data->update(); 689 _task_queue = data->current_queue; 690 } 691 692 if(!_task_queue) 693 { 694 --(*m_thread_awake); 695 throw std::runtime_error("No task queu 696 } 697 698 assert(data->current_queue != nullptr); 699 assert(_task_queue == data->current_queue) 700 701 // essentially a dummy run 702 if(_task_queue) 703 { 704 data->within_task = true; 705 auto _task = _task_queue->GetTa 706 if(_task) 707 { 708 (*_task)(); 709 } 710 data->within_task = false; 711 } 712 713 // threads stay in this loop forever until 714 while(true) 715 { 716 static thread_local auto p_task_lock = 717 718 //------------------------------------ 719 // Try to pick a task 720 AutoLock _task_lock(*p_task_lock, std: 721 //------------------------------------ 722 723 auto leave_pool = [&]() { 724 auto _state = [&]() { return 725 auto _pool_state = _state(); 726 if(_pool_state > 0) 727 { 728 // stop whole pool 729 if(_pool_state == thread_pool: 730 { 731 if(_task_lock.owns_lock()) 732 _task_lock.unlock(); 733 return true; 734 } 735 // single thread stoppage 736 else if(_pool_state == thread_ 737 { 738 if(!_task_lock.owns_lock() 739 _task_lock.lock(); 740 if(!m_is_stopped.empty() & 741 { 742 m_stop_threads.push_ba 743 m_is_stopped.pop_back( 744 if(_task_lock.owns_loc 745 _task_lock.unlock( 746 // exit entire functio 747 return true; 748 } 749 if(_task_lock.owns_lock()) 750 _task_lock.unlock(); 751 } 752 } 753 return false; 754 }; 755 756 // We need to put condition.wait() in 757 // 1. There can be spurious wake-ups ( 758 // 2. When mutex is released for waiti 759 // from a signal/broadcast and that 760 // So when the current thread wakes 761 // actually true! 762 while(_task_queue->empty()) 763 { 764 auto _state = [&]() { return stati 765 auto _size = [&]() { return _task 766 auto _empty = [&]() { return _task 767 auto _wake = [&]() { return (!_em 768 769 if(leave_pool()) 770 return; 771 772 if(_task_queue->true_size() == 0) 773 { 774 if(m_thread_awake->load() > 0) 775 --(*m_thread_awake); 776 777 // lock before sleeping on con 778 if(!_task_lock.owns_lock()) 779 _task_lock.lock(); 780 781 // Wait until there is a task 782 // Unlocks mutex while waiting 783 // use lambda to control wakin 784 m_task_cond->wait(_task_lock, 785 786 if(_state() == thread_pool::st 787 return; 788 789 // unlock if owned 790 if(_task_lock.owns_lock()) 791 _task_lock.unlock(); 792 793 // notify that is awake 794 if(m_thread_awake->load() < m_ 795 ++(*m_thread_awake); 796 } 797 else 798 break; 799 } 800 801 // release the lock 802 if(_task_lock.owns_lock()) 803 _task_lock.unlock(); 804 805 //------------------------------------ 806 807 // leave pool if conditions dictate it 808 if(leave_pool()) 809 return; 810 811 // activate guard against recursive de 812 data->within_task = true; 813 //------------------------------------ 814 815 // execute the task(s) 816 while(!_task_queue->empty()) 817 { 818 auto _task = _task_queue->GetTask( 819 if(_task) 820 { 821 (*_task)(); 822 } 823 } 824 //------------------------------------ 825 826 // disable guard against recursive dea 827 data->within_task = false; 828 //------------------------------------ 829 } 830 } 831 832 //============================================ 833 834 } // namespace PTL 835