Geant4 Cross Reference |
1 // 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 19 // --------------------------------------------------------------- 20 // Tasking class implementation 21 // 22 // Class Description: 23 // 24 // This file creates a class for an efficient thread-pool that 25 // accepts work in the form of tasks. 26 // 27 // --------------------------------------------------------------- 28 // Author: Jonathan Madsen (Feb 13th 2018) 29 // --------------------------------------------------------------- 30 31 #include "PTL/ThreadPool.hh" 32 #include "PTL/GetEnv.hh" 33 #include "PTL/ScopeDestructor.hh" 34 #include "PTL/ThreadData.hh" 35 #include "PTL/Threading.hh" 36 #include "PTL/UserTaskQueue.hh" 37 #include "PTL/VUserTaskQueue.hh" 38 39 #include <cassert> 40 #include <mutex> 41 #include <new> 42 #include <stdexcept> 43 #include <thread> 44 45 //======================================================================================// 46 47 namespace 48 { 49 PTL::ThreadData*& 50 thread_data() 51 { 52 return PTL::ThreadData::GetInstance(); 53 } 54 } // namespace 55 56 namespace PTL 57 { 58 //======================================================================================// 59 60 ThreadPool::thread_id_map_t& 61 ThreadPool::f_thread_ids() 62 { 63 static auto _v = thread_id_map_t{}; 64 return _v; 65 } 66 67 //======================================================================================// 68 69 ThreadPool::size_type& 70 ThreadPool::f_default_pool_size() 71 { 72 static size_type _v = 73 GetEnv<size_type>("PTL_NUM_THREADS", Thread::hardware_concurrency()); 74 return _v; 75 } 76 77 //======================================================================================// 78 // static member function that calls the member function we want the thread to 79 // run 80 void 81 ThreadPool::start_thread(ThreadPool* tp, thread_data_t* _data, intmax_t _idx) 82 { 83 if(tp->get_verbose() > 0) 84 { 85 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 86 std::cerr << "[PTL::ThreadPool] Starting thread " << _idx << "..." << std::endl; 87 } 88 89 auto _thr_data = std::make_shared<ThreadData>(tp); 90 { 91 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock); 92 if(!lock.owns_lock()) 93 lock.lock(); 94 if(_idx < 0) 95 _idx = f_thread_ids().size(); 96 f_thread_ids()[std::this_thread::get_id()] = _idx; 97 SetThreadId((int)_idx); 98 _data->emplace_back(_thr_data); 99 } 100 thread_data() = _thr_data.get(); 101 tp->record_entry(); 102 tp->execute_thread(thread_data()->current_queue); 103 tp->record_exit(); 104 105 if(tp->get_verbose() > 0) 106 { 107 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 108 std::cerr << "[PTL::ThreadPool] Thread " << _idx << " terminating..." 109 << std::endl; 110 } 111 } 112 113 //======================================================================================// 114 115 const ThreadPool::thread_id_map_t& 116 ThreadPool::get_thread_ids() 117 { 118 return f_thread_ids(); 119 } 120 121 //======================================================================================// 122 123 uintmax_t 124 ThreadPool::get_thread_id(ThreadId _tid) 125 { 126 uintmax_t _idx = 0; 127 { 128 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock); 129 if(!lock.owns_lock()) 130 lock.lock(); 131 auto itr = f_thread_ids().find(_tid); 132 if(itr == f_thread_ids().end()) 133 { 134 _idx = f_thread_ids().size(); 135 f_thread_ids()[_tid] = _idx; 136 } 137 else 138 { 139 _idx = itr->second; 140 } 141 } 142 return _idx; 143 } 144 145 //======================================================================================// 146 147 uintmax_t 148 ThreadPool::get_this_thread_id() 149 { 150 return get_thread_id(ThisThread::get_id()); 151 } 152 153 //======================================================================================// 154 155 uintmax_t 156 ThreadPool::add_thread_id(ThreadId _tid) 157 { 158 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock); 159 if(!lock.owns_lock()) 160 lock.lock(); 161 if(f_thread_ids().find(_tid) == f_thread_ids().end()) 162 { 163 auto _idx = f_thread_ids().size(); 164 f_thread_ids()[_tid] = _idx; 165 SetThreadId((int)_idx); 166 } 167 return f_thread_ids().at(_tid); 168 } 169 170 //======================================================================================// 171 172 ThreadPool::ThreadPool(const Config& _cfg) 173 : m_use_affinity{ _cfg.use_affinity } 174 , m_tbb_tp{ _cfg.use_tbb } 175 , m_verbose{ _cfg.verbose } 176 , m_priority{ _cfg.priority } 177 , m_pool_state{ std::make_shared<std::atomic_short>(thread_pool::state::NONINIT) } 178 , m_task_queue{ _cfg.task_queue } 179 , m_init_func{ _cfg.initializer } 180 , m_fini_func{ _cfg.finalizer } 181 , m_affinity_func{ _cfg.set_affinity } 182 { 183 auto master_id = get_this_thread_id(); 184 if(master_id != 0 && m_verbose > 1) 185 { 186 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 187 std::cerr << "[PTL::ThreadPool] ThreadPool created on worker thread" << std::endl; 188 } 189 190 thread_data() = new ThreadData(this); 191 192 // initialize after get_this_thread_id so master is zero 193 if(_cfg.init) 194 this->initialize_threadpool(_cfg.pool_size); 195 } 196 197 //======================================================================================// 198 199 ThreadPool::~ThreadPool() 200 { 201 if(m_alive_flag->load()) 202 { 203 std::cerr << "Warning! ThreadPool was not properly destroyed! Call " 204 "destroy_threadpool() before deleting the ThreadPool object to " 205 "eliminate this message." 206 << std::endl; 207 m_pool_state->store(thread_pool::state::STOPPED); 208 m_task_lock->lock(); 209 m_task_cond->notify_all(); 210 m_task_lock->unlock(); 211 for(auto& itr : m_threads) 212 itr.join(); 213 m_threads.clear(); 214 } 215 216 // delete owned resources 217 if(m_delete_task_queue) 218 delete m_task_queue; 219 220 delete m_tbb_task_arena; 221 delete m_tbb_task_group; 222 } 223 224 //======================================================================================// 225 226 bool 227 ThreadPool::is_initialized() const 228 { 229 return !(m_pool_state->load() == thread_pool::state::NONINIT); 230 } 231 232 //======================================================================================// 233 234 void 235 ThreadPool::record_entry() 236 { 237 ++(*m_thread_active); 238 } 239 240 //======================================================================================// 241 242 void 243 ThreadPool::record_exit() 244 { 245 --(*m_thread_active); 246 } 247 248 //======================================================================================// 249 250 void 251 ThreadPool::set_affinity(intmax_t i, Thread& _thread) const 252 { 253 try 254 { 255 NativeThread native_thread = _thread.native_handle(); 256 intmax_t _pin = m_affinity_func(i); 257 if(m_verbose > 0) 258 { 259 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 260 std::cerr << "[PTL::ThreadPool] Setting pin affinity for thread " 261 << get_thread_id(_thread.get_id()) << " to " << _pin << std::endl; 262 } 263 SetPinAffinity((int)_pin, native_thread); 264 } catch(std::runtime_error& e) 265 { 266 std::cerr << "[PTL::ThreadPool] Error setting pin affinity: " << e.what() 267 << std::endl; 268 } 269 } 270 271 //======================================================================================// 272 273 void 274 ThreadPool::set_priority(int _prio, Thread& _thread) const 275 { 276 try 277 { 278 NativeThread native_thread = _thread.native_handle(); 279 if(m_verbose > 0) 280 { 281 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 282 std::cerr << "[PTL::ThreadPool] Setting thread " 283 << get_thread_id(_thread.get_id()) << " priority to " << _prio 284 << std::endl; 285 } 286 SetThreadPriority(_prio, native_thread); 287 } catch(std::runtime_error& e) 288 { 289 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 290 std::cerr << "[PTL::ThreadPool] Error setting thread priority: " << e.what() 291 << std::endl; 292 } 293 } 294 295 //======================================================================================// 296 297 ThreadPool::size_type 298 ThreadPool::initialize_threadpool(size_type proposed_size) 299 { 300 //--------------------------------------------------------------------// 301 // return before initializing 302 if(proposed_size < 1) 303 return 0; 304 305 //--------------------------------------------------------------------// 306 // store that has been started 307 if(!m_alive_flag->load()) 308 m_pool_state->store(thread_pool::state::STARTED); 309 310 #if defined(PTL_USE_TBB) 311 //--------------------------------------------------------------------// 312 // handle tbb task scheduler 313 if(m_tbb_tp) 314 { 315 m_tbb_tp = true; 316 m_pool_size = proposed_size; 317 tbb_global_control_t*& _global_control = tbb_global_control(); 318 // delete if wrong size 319 if(m_pool_size != proposed_size) 320 { 321 delete _global_control; 322 _global_control = nullptr; 323 } 324 325 if(!_global_control) 326 { 327 _global_control = new tbb_global_control_t( 328 tbb::global_control::max_allowed_parallelism, proposed_size + 1); 329 if(m_verbose > 0) 330 { 331 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 332 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] initialized with " 333 << m_pool_size << " threads." << std::endl; 334 } 335 } 336 337 // create task group (used for async) 338 if(!m_tbb_task_group) 339 { 340 m_tbb_task_group = new tbb_task_group_t{}; 341 execute_on_all_threads([this]() { m_init_func(); }); 342 } 343 344 return m_pool_size; 345 } 346 #endif 347 348 m_alive_flag->store(true); 349 350 //--------------------------------------------------------------------// 351 // if started, stop some thread if smaller or return if equal 352 if(m_pool_state->load() == thread_pool::state::STARTED) 353 { 354 if(m_pool_size > proposed_size) 355 { 356 while(stop_thread() > proposed_size) 357 ; 358 if(m_verbose > 0) 359 { 360 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 361 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with " 362 << m_pool_size << " threads." << std::endl; 363 } 364 if(!m_task_queue) 365 { 366 m_delete_task_queue = true; 367 m_task_queue = new UserTaskQueue(m_pool_size); 368 } 369 else 370 { 371 m_task_queue->resize(m_pool_size); 372 } 373 return m_pool_size; 374 } 375 else if(m_pool_size == proposed_size) // NOLINT 376 { 377 if(m_verbose > 0) 378 { 379 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 380 std::cerr << "ThreadPool initialized with " << m_pool_size << " threads." 381 << std::endl; 382 } 383 if(!m_task_queue) 384 { 385 m_delete_task_queue = true; 386 m_task_queue = new UserTaskQueue(m_pool_size); 387 } 388 return m_pool_size; 389 } 390 } 391 392 //--------------------------------------------------------------------// 393 // reserve enough space to prevent realloc later 394 { 395 AutoLock _task_lock(*m_task_lock); 396 m_is_joined.reserve(proposed_size); 397 } 398 399 if(!m_task_queue) 400 { 401 m_delete_task_queue = true; 402 m_task_queue = new UserTaskQueue(proposed_size); 403 } 404 405 auto this_tid = get_this_thread_id(); 406 for(size_type i = m_pool_size; i < proposed_size; ++i) 407 { 408 // add the threads 409 try 410 { 411 // create thread 412 Thread thr{ ThreadPool::start_thread, this, &m_thread_data, 413 this_tid + i + 1 }; 414 // only reaches here if successful creation of thread 415 ++m_pool_size; 416 // store thread 417 m_main_threads.push_back(thr.get_id()); 418 // list of joined thread booleans 419 m_is_joined.push_back(false); 420 // set the affinity 421 if(m_use_affinity) 422 set_affinity(i, thr); 423 set_priority(m_priority, thr); 424 // store 425 m_threads.emplace_back(std::move(thr)); 426 } catch(std::runtime_error& e) 427 { 428 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 429 std::cerr << "[PTL::ThreadPool] " << e.what() 430 << std::endl; // issue creating thread 431 continue; 432 } catch(std::bad_alloc& e) 433 { 434 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 435 std::cerr << "[PTL::ThreadPool] " << e.what() << std::endl; 436 continue; 437 } 438 } 439 //------------------------------------------------------------------------// 440 441 AutoLock _task_lock(*m_task_lock); 442 443 // thread pool size doesn't match with join vector 444 // this will screw up joining later 445 if(m_is_joined.size() != m_main_threads.size()) 446 { 447 std::stringstream ss; 448 ss << "ThreadPool::initialize_threadpool - boolean is_joined vector " 449 << "is a different size than threads vector: " << m_is_joined.size() << " vs. " 450 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")"; 451 452 throw std::runtime_error(ss.str()); 453 } 454 455 if(m_verbose > 0) 456 { 457 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 458 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with " << m_pool_size 459 << " threads." << std::endl; 460 } 461 462 return m_main_threads.size(); 463 } 464 465 //======================================================================================// 466 467 ThreadPool::size_type 468 ThreadPool::destroy_threadpool() 469 { 470 // Note: this is not for synchronization, its for thread communication! 471 // destroy_threadpool() will only be called from the main thread, yet 472 // the modified m_pool_state may not show up to other threads until its 473 // modified in a lock! 474 //------------------------------------------------------------------------// 475 m_pool_state->store(thread_pool::state::STOPPED); 476 477 //--------------------------------------------------------------------// 478 // handle tbb task scheduler 479 #if defined(PTL_USE_TBB) 480 if(m_tbb_task_group) 481 { 482 execute_on_all_threads([this]() { m_fini_func(); }); 483 auto _func = [&]() { m_tbb_task_group->wait(); }; 484 if(m_tbb_task_arena) 485 m_tbb_task_arena->execute(_func); 486 else 487 _func(); 488 delete m_tbb_task_group; 489 m_tbb_task_group = nullptr; 490 } 491 if(m_tbb_task_arena) 492 { 493 delete m_tbb_task_arena; 494 m_tbb_task_arena = nullptr; 495 } 496 if(m_tbb_tp && tbb_global_control()) 497 { 498 tbb_global_control_t*& _global_control = tbb_global_control(); 499 delete _global_control; 500 _global_control = nullptr; 501 m_tbb_tp = false; 502 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 503 if(m_verbose > 0) 504 { 505 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] destroyed" << std::endl; 506 } 507 } 508 #endif 509 510 if(!m_alive_flag->load()) 511 return 0; 512 513 //------------------------------------------------------------------------// 514 // notify all threads we are shutting down 515 m_task_lock->lock(); 516 m_task_cond->notify_all(); 517 m_task_lock->unlock(); 518 //------------------------------------------------------------------------// 519 520 if(m_is_joined.size() != m_main_threads.size()) 521 { 522 std::stringstream ss; 523 ss << " ThreadPool::destroy_thread_pool - boolean is_joined vector " 524 << "is a different size than threads vector: " << m_is_joined.size() << " vs. " 525 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")"; 526 527 throw std::runtime_error(ss.str()); 528 } 529 530 for(size_type i = 0; i < m_is_joined.size(); i++) 531 { 532 //--------------------------------------------------------------------// 533 // 534 if(i < m_threads.size()) 535 m_threads.at(i).join(); 536 537 //--------------------------------------------------------------------// 538 // if its joined already, nothing else needs to be done 539 if(m_is_joined.at(i)) 540 continue; 541 542 //--------------------------------------------------------------------// 543 // join 544 if(std::this_thread::get_id() == m_main_threads[i]) 545 continue; 546 547 //--------------------------------------------------------------------// 548 // thread id and index 549 auto _tid = m_main_threads[i]; 550 551 //--------------------------------------------------------------------// 552 // erase thread from thread ID list 553 if(f_thread_ids().find(_tid) != f_thread_ids().end()) 554 f_thread_ids().erase(f_thread_ids().find(_tid)); 555 556 //--------------------------------------------------------------------// 557 // it's joined 558 m_is_joined.at(i) = true; 559 } 560 561 m_thread_data.clear(); 562 m_threads.clear(); 563 m_main_threads.clear(); 564 m_is_joined.clear(); 565 566 m_alive_flag->store(false); 567 568 auto start = std::chrono::steady_clock::now(); 569 auto elapsed = std::chrono::duration<double>{}; 570 // wait maximum of 30 seconds for threads to exit 571 while(m_thread_active->load() > 0 && elapsed.count() < 30) 572 { 573 std::this_thread::sleep_for(std::chrono::milliseconds(50)); 574 elapsed = std::chrono::steady_clock::now() - start; 575 } 576 577 auto _active = m_thread_active->load(); 578 579 if(get_verbose() > 0) 580 { 581 if(_active == 0) 582 { 583 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 584 std::cerr << "[PTL::ThreadPool] ThreadPool destroyed" << std::endl; 585 } 586 else 587 { 588 AutoLock lock(TypeMutex<decltype(std::cerr)>()); 589 std::cerr << "[PTL::ThreadPool] ThreadPool destroyed but " << _active 590 << " threads might still be active (and cause a termination error)" 591 << std::endl; 592 } 593 } 594 595 if(m_delete_task_queue) 596 { 597 delete m_task_queue; 598 m_task_queue = nullptr; 599 } 600 601 return 0; 602 } 603 604 //======================================================================================// 605 606 ThreadPool::size_type 607 ThreadPool::stop_thread() 608 { 609 if(!m_alive_flag->load() || m_pool_size == 0) 610 return 0; 611 612 m_pool_state->store(thread_pool::state::PARTIAL); 613 614 //------------------------------------------------------------------------// 615 // notify all threads we are shutting down 616 m_task_lock->lock(); 617 m_is_stopped.push_back(true); 618 m_task_cond->notify_one(); 619 m_task_lock->unlock(); 620 //------------------------------------------------------------------------// 621 622 while(!m_is_stopped.empty() && m_stop_threads.empty()) 623 ; 624 625 // lock up the task queue 626 AutoLock _task_lock(*m_task_lock); 627 628 while(!m_stop_threads.empty()) 629 { 630 auto tid = m_stop_threads.front(); 631 // remove from stopped 632 m_stop_threads.pop_front(); 633 // remove from main 634 for(auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr) 635 { 636 if(*itr == tid) 637 { 638 m_main_threads.erase(itr); 639 break; 640 } 641 } 642 // remove from join list 643 m_is_joined.pop_back(); 644 } 645 646 m_pool_state->store(thread_pool::state::STARTED); 647 648 m_pool_size = m_main_threads.size(); 649 return m_main_threads.size(); 650 } 651 652 //======================================================================================// 653 654 ThreadPool::task_queue_t*& 655 ThreadPool::get_valid_queue(task_queue_t*& _queue) const 656 { 657 if(!_queue) 658 _queue = new UserTaskQueue{ static_cast<intmax_t>(m_pool_size) }; 659 return _queue; 660 } 661 //======================================================================================// 662 663 // Temporary workaround for shared_ptr constructor GPFLT on Intel Macs and Clang 15 664 #if defined (__APPLE__) && defined(__amd64) && defined(__clang__) 665 [[clang::optnone]] 666 #endif 667 void 668 ThreadPool::execute_thread(VUserTaskQueue* _task_queue) 669 { 670 ++(*m_thread_awake); 671 672 // initialization function 673 m_init_func(); 674 // finalization function (executed when scope is destroyed) 675 ScopeDestructor _fini{ [this]() { m_fini_func(); } }; 676 677 ThreadId tid = ThisThread::get_id(); 678 ThreadData* data = thread_data(); 679 // auto thread_bin = _task_queue->GetThreadBin(); 680 // auto workers = _task_queue->workers(); 681 682 auto start = std::chrono::steady_clock::now(); 683 auto elapsed = std::chrono::duration<double>{}; 684 // check for updates for 60 seconds max 685 while(!_task_queue && elapsed.count() < 60) 686 { 687 elapsed = std::chrono::steady_clock::now() - start; 688 data->update(); 689 _task_queue = data->current_queue; 690 } 691 692 if(!_task_queue) 693 { 694 --(*m_thread_awake); 695 throw std::runtime_error("No task queue was found after 60 seconds!"); 696 } 697 698 assert(data->current_queue != nullptr); 699 assert(_task_queue == data->current_queue); 700 701 // essentially a dummy run 702 if(_task_queue) 703 { 704 data->within_task = true; 705 auto _task = _task_queue->GetTask(); 706 if(_task) 707 { 708 (*_task)(); 709 } 710 data->within_task = false; 711 } 712 713 // threads stay in this loop forever until thread-pool destroyed 714 while(true) 715 { 716 static thread_local auto p_task_lock = m_task_lock; 717 718 //--------------------------------------------------------------------// 719 // Try to pick a task 720 AutoLock _task_lock(*p_task_lock, std::defer_lock); 721 //--------------------------------------------------------------------// 722 723 auto leave_pool = [&]() { 724 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); }; 725 auto _pool_state = _state(); 726 if(_pool_state > 0) 727 { 728 // stop whole pool 729 if(_pool_state == thread_pool::state::STOPPED) 730 { 731 if(_task_lock.owns_lock()) 732 _task_lock.unlock(); 733 return true; 734 } 735 // single thread stoppage 736 else if(_pool_state == thread_pool::state::PARTIAL) // NOLINT 737 { 738 if(!_task_lock.owns_lock()) 739 _task_lock.lock(); 740 if(!m_is_stopped.empty() && m_is_stopped.back()) 741 { 742 m_stop_threads.push_back(tid); 743 m_is_stopped.pop_back(); 744 if(_task_lock.owns_lock()) 745 _task_lock.unlock(); 746 // exit entire function 747 return true; 748 } 749 if(_task_lock.owns_lock()) 750 _task_lock.unlock(); 751 } 752 } 753 return false; 754 }; 755 756 // We need to put condition.wait() in a loop for two reasons: 757 // 1. There can be spurious wake-ups (due to signal/ENITR) 758 // 2. When mutex is released for waiting, another thread can be woken up 759 // from a signal/broadcast and that thread can mess up the condition. 760 // So when the current thread wakes up the condition may no longer be 761 // actually true! 762 while(_task_queue->empty()) 763 { 764 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); }; 765 auto _size = [&]() { return _task_queue->true_size(); }; 766 auto _empty = [&]() { return _task_queue->empty(); }; 767 auto _wake = [&]() { return (!_empty() || _size() > 0 || _state() > 0); }; 768 769 if(leave_pool()) 770 return; 771 772 if(_task_queue->true_size() == 0) 773 { 774 if(m_thread_awake->load() > 0) 775 --(*m_thread_awake); 776 777 // lock before sleeping on condition 778 if(!_task_lock.owns_lock()) 779 _task_lock.lock(); 780 781 // Wait until there is a task in the queue 782 // Unlocks mutex while waiting, then locks it back when signaled 783 // use lambda to control waking 784 m_task_cond->wait(_task_lock, _wake); 785 786 if(_state() == thread_pool::state::STOPPED) 787 return; 788 789 // unlock if owned 790 if(_task_lock.owns_lock()) 791 _task_lock.unlock(); 792 793 // notify that is awake 794 if(m_thread_awake->load() < m_pool_size) 795 ++(*m_thread_awake); 796 } 797 else 798 break; 799 } 800 801 // release the lock 802 if(_task_lock.owns_lock()) 803 _task_lock.unlock(); 804 805 //----------------------------------------------------------------// 806 807 // leave pool if conditions dictate it 808 if(leave_pool()) 809 return; 810 811 // activate guard against recursive deadlock 812 data->within_task = true; 813 //----------------------------------------------------------------// 814 815 // execute the task(s) 816 while(!_task_queue->empty()) 817 { 818 auto _task = _task_queue->GetTask(); 819 if(_task) 820 { 821 (*_task)(); 822 } 823 } 824 //----------------------------------------------------------------// 825 826 // disable guard against recursive deadlock 827 data->within_task = false; 828 //----------------------------------------------------------------// 829 } 830 } 831 832 //======================================================================================// 833 834 } // namespace PTL 835