Geant4 Cross Reference

Cross-Referencing   Geant4
Geant4/externals/ptl/src/ThreadPool.cc

Version: [ ReleaseNotes ] [ 1.0 ] [ 1.1 ] [ 2.0 ] [ 3.0 ] [ 3.1 ] [ 3.2 ] [ 4.0 ] [ 4.0.p1 ] [ 4.0.p2 ] [ 4.1 ] [ 4.1.p1 ] [ 5.0 ] [ 5.0.p1 ] [ 5.1 ] [ 5.1.p1 ] [ 5.2 ] [ 5.2.p1 ] [ 5.2.p2 ] [ 6.0 ] [ 6.0.p1 ] [ 6.1 ] [ 6.2 ] [ 6.2.p1 ] [ 6.2.p2 ] [ 7.0 ] [ 7.0.p1 ] [ 7.1 ] [ 7.1.p1 ] [ 8.0 ] [ 8.0.p1 ] [ 8.1 ] [ 8.1.p1 ] [ 8.1.p2 ] [ 8.2 ] [ 8.2.p1 ] [ 8.3 ] [ 8.3.p1 ] [ 8.3.p2 ] [ 9.0 ] [ 9.0.p1 ] [ 9.0.p2 ] [ 9.1 ] [ 9.1.p1 ] [ 9.1.p2 ] [ 9.1.p3 ] [ 9.2 ] [ 9.2.p1 ] [ 9.2.p2 ] [ 9.2.p3 ] [ 9.2.p4 ] [ 9.3 ] [ 9.3.p1 ] [ 9.3.p2 ] [ 9.4 ] [ 9.4.p1 ] [ 9.4.p2 ] [ 9.4.p3 ] [ 9.4.p4 ] [ 9.5 ] [ 9.5.p1 ] [ 9.5.p2 ] [ 9.6 ] [ 9.6.p1 ] [ 9.6.p2 ] [ 9.6.p3 ] [ 9.6.p4 ] [ 10.0 ] [ 10.0.p1 ] [ 10.0.p2 ] [ 10.0.p3 ] [ 10.0.p4 ] [ 10.1 ] [ 10.1.p1 ] [ 10.1.p2 ] [ 10.1.p3 ] [ 10.2 ] [ 10.2.p1 ] [ 10.2.p2 ] [ 10.2.p3 ] [ 10.3 ] [ 10.3.p1 ] [ 10.3.p2 ] [ 10.3.p3 ] [ 10.4 ] [ 10.4.p1 ] [ 10.4.p2 ] [ 10.4.p3 ] [ 10.5 ] [ 10.5.p1 ] [ 10.6 ] [ 10.6.p1 ] [ 10.6.p2 ] [ 10.6.p3 ] [ 10.7 ] [ 10.7.p1 ] [ 10.7.p2 ] [ 10.7.p3 ] [ 10.7.p4 ] [ 11.0 ] [ 11.0.p1 ] [ 11.0.p2 ] [ 11.0.p3, ] [ 11.0.p4 ] [ 11.1 ] [ 11.1.1 ] [ 11.1.2 ] [ 11.1.3 ] [ 11.2 ] [ 11.2.1 ] [ 11.2.2 ] [ 11.3.0 ]

  1 //
  2 // MIT License
  3 // Copyright (c) 2020 Jonathan R. Madsen
  4 // Permission is hereby granted, free of charge, to any person obtaining a copy
  5 // of this software and associated documentation files (the "Software"), to deal
  6 // in the Software without restriction, including without limitation the rights
  7 // to use, copy, modify, merge, publish, distribute, sublicense, and
  8 // copies of the Software, and to permit persons to whom the Software is
  9 // furnished to do so, subject to the following conditions:
 10 // The above copyright notice and this permission notice shall be included in
 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 18 //
 19 // ---------------------------------------------------------------
 20 //  Tasking class implementation
 21 //
 22 // Class Description:
 23 //
 24 // This file creates a class for an efficient thread-pool that
 25 // accepts work in the form of tasks.
 26 //
 27 // ---------------------------------------------------------------
 28 // Author: Jonathan Madsen (Feb 13th 2018)
 29 // ---------------------------------------------------------------
 30 
 31 #include "PTL/ThreadPool.hh"
 32 #include "PTL/GetEnv.hh"
 33 #include "PTL/ScopeDestructor.hh"
 34 #include "PTL/ThreadData.hh"
 35 #include "PTL/Threading.hh"
 36 #include "PTL/UserTaskQueue.hh"
 37 #include "PTL/VUserTaskQueue.hh"
 38 
 39 #include <cassert>
 40 #include <mutex>
 41 #include <new>
 42 #include <stdexcept>
 43 #include <thread>
 44 
 45 //======================================================================================//
 46 
 47 namespace
 48 {
 49 PTL::ThreadData*&
 50 thread_data()
 51 {
 52     return PTL::ThreadData::GetInstance();
 53 }
 54 }  // namespace
 55 
 56 namespace PTL
 57 {
 58 //======================================================================================//
 59 
 60 ThreadPool::thread_id_map_t&
 61 ThreadPool::f_thread_ids()
 62 {
 63     static auto _v = thread_id_map_t{};
 64     return _v;
 65 }
 66 
 67 //======================================================================================//
 68 
 69 ThreadPool::size_type&
 70 ThreadPool::f_default_pool_size()
 71 {
 72     static size_type _v =
 73         GetEnv<size_type>("PTL_NUM_THREADS", Thread::hardware_concurrency());
 74     return _v;
 75 }
 76 
 77 //======================================================================================//
 78 // static member function that calls the member function we want the thread to
 79 // run
 80 void
 81 ThreadPool::start_thread(ThreadPool* tp, thread_data_t* _data, intmax_t _idx)
 82 {
 83     if(tp->get_verbose() > 0)
 84     {
 85         AutoLock lock(TypeMutex<decltype(std::cerr)>());
 86         std::cerr << "[PTL::ThreadPool] Starting thread " << _idx << "..." << std::endl;
 87     }
 88 
 89     auto _thr_data = std::make_shared<ThreadData>(tp);
 90     {
 91         AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
 92         if(!lock.owns_lock())
 93             lock.lock();
 94         if(_idx < 0)
 95             _idx = f_thread_ids().size();
 96         f_thread_ids()[std::this_thread::get_id()] = _idx;
 97         SetThreadId((int)_idx);
 98         _data->emplace_back(_thr_data);
 99     }
100     thread_data() = _thr_data.get();
101     tp->record_entry();
102     tp->execute_thread(thread_data()->current_queue);
103     tp->record_exit();
104 
105     if(tp->get_verbose() > 0)
106     {
107         AutoLock lock(TypeMutex<decltype(std::cerr)>());
108         std::cerr << "[PTL::ThreadPool] Thread " << _idx << " terminating..."
109                   << std::endl;
110     }
111 }
112 
113 //======================================================================================//
114 
115 const ThreadPool::thread_id_map_t&
116 ThreadPool::get_thread_ids()
117 {
118     return f_thread_ids();
119 }
120 
121 //======================================================================================//
122 
123 uintmax_t
124 ThreadPool::get_thread_id(ThreadId _tid)
125 {
126     uintmax_t _idx = 0;
127     {
128         AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
129         if(!lock.owns_lock())
130             lock.lock();
131         auto itr = f_thread_ids().find(_tid);
132         if(itr == f_thread_ids().end())
133         {
134             _idx                 = f_thread_ids().size();
135             f_thread_ids()[_tid] = _idx;
136         }
137         else
138         {
139             _idx = itr->second;
140         }
141     }
142     return _idx;
143 }
144 
145 //======================================================================================//
146 
147 uintmax_t
148 ThreadPool::get_this_thread_id()
149 {
150     return get_thread_id(ThisThread::get_id());
151 }
152 
153 //======================================================================================//
154 
155 uintmax_t
156 ThreadPool::add_thread_id(ThreadId _tid)
157 {
158     AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
159     if(!lock.owns_lock())
160         lock.lock();
161     if(f_thread_ids().find(_tid) == f_thread_ids().end())
162     {
163         auto _idx            = f_thread_ids().size();
164         f_thread_ids()[_tid] = _idx;
165         SetThreadId((int)_idx);
166     }
167     return f_thread_ids().at(_tid);
168 }
169 
170 //======================================================================================//
171 
172 ThreadPool::ThreadPool(const Config& _cfg)
173 : m_use_affinity{ _cfg.use_affinity }
174 , m_tbb_tp{ _cfg.use_tbb }
175 , m_verbose{ _cfg.verbose }
176 , m_priority{ _cfg.priority }
177 , m_pool_state{ std::make_shared<std::atomic_short>(thread_pool::state::NONINIT) }
178 , m_task_queue{ _cfg.task_queue }
179 , m_init_func{ _cfg.initializer }
180 , m_fini_func{ _cfg.finalizer }
181 , m_affinity_func{ _cfg.set_affinity }
182 {
183     auto master_id = get_this_thread_id();
184     if(master_id != 0 && m_verbose > 1)
185     {
186         AutoLock lock(TypeMutex<decltype(std::cerr)>());
187         std::cerr << "[PTL::ThreadPool] ThreadPool created on worker thread" << std::endl;
188     }
189 
190     thread_data() = new ThreadData(this);
191 
192     // initialize after get_this_thread_id so master is zero
193     if(_cfg.init)
194         this->initialize_threadpool(_cfg.pool_size);
195 }
196 
197 //======================================================================================//
198 
199 ThreadPool::~ThreadPool()
200 {
201     if(m_alive_flag->load())
202     {
203         std::cerr << "Warning! ThreadPool was not properly destroyed! Call "
204                      "destroy_threadpool() before deleting the ThreadPool object to "
205                      "eliminate this message."
206                   << std::endl;
207         m_pool_state->store(thread_pool::state::STOPPED);
208         m_task_lock->lock();
209         m_task_cond->notify_all();
210         m_task_lock->unlock();
211         for(auto& itr : m_threads)
212             itr.join();
213         m_threads.clear();
214     }
215 
216     // delete owned resources
217     if(m_delete_task_queue)
218         delete m_task_queue;
219 
220     delete m_tbb_task_arena;
221     delete m_tbb_task_group;
222 }
223 
224 //======================================================================================//
225 
226 bool
227 ThreadPool::is_initialized() const
228 {
229     return !(m_pool_state->load() == thread_pool::state::NONINIT);
230 }
231 
232 //======================================================================================//
233 
234 void
235 ThreadPool::record_entry()
236 {
237     ++(*m_thread_active);
238 }
239 
240 //======================================================================================//
241 
242 void
243 ThreadPool::record_exit()
244 {
245     --(*m_thread_active);
246 }
247 
248 //======================================================================================//
249 
250 void
251 ThreadPool::set_affinity(intmax_t i, Thread& _thread) const
252 {
253     try
254     {
255         NativeThread native_thread = _thread.native_handle();
256         intmax_t     _pin          = m_affinity_func(i);
257         if(m_verbose > 0)
258         {
259             AutoLock lock(TypeMutex<decltype(std::cerr)>());
260             std::cerr << "[PTL::ThreadPool] Setting pin affinity for thread "
261                       << get_thread_id(_thread.get_id()) << " to " << _pin << std::endl;
262         }
263         SetPinAffinity((int)_pin, native_thread);
264     } catch(std::runtime_error& e)
265     {
266         std::cerr << "[PTL::ThreadPool] Error setting pin affinity: " << e.what()
267                   << std::endl;
268     }
269 }
270 
271 //======================================================================================//
272 
273 void
274 ThreadPool::set_priority(int _prio, Thread& _thread) const
275 {
276     try
277     {
278         NativeThread native_thread = _thread.native_handle();
279         if(m_verbose > 0)
280         {
281             AutoLock lock(TypeMutex<decltype(std::cerr)>());
282             std::cerr << "[PTL::ThreadPool] Setting thread "
283                       << get_thread_id(_thread.get_id()) << " priority to " << _prio
284                       << std::endl;
285         }
286         SetThreadPriority(_prio, native_thread);
287     } catch(std::runtime_error& e)
288     {
289         AutoLock lock(TypeMutex<decltype(std::cerr)>());
290         std::cerr << "[PTL::ThreadPool] Error setting thread priority: " << e.what()
291                   << std::endl;
292     }
293 }
294 
295 //======================================================================================//
296 
297 ThreadPool::size_type
298 ThreadPool::initialize_threadpool(size_type proposed_size)
299 {
300     //--------------------------------------------------------------------//
301     // return before initializing
302     if(proposed_size < 1)
303         return 0;
304 
305     //--------------------------------------------------------------------//
306     // store that has been started
307     if(!m_alive_flag->load())
308         m_pool_state->store(thread_pool::state::STARTED);
309 
310 #if defined(PTL_USE_TBB)
311     //--------------------------------------------------------------------//
312     // handle tbb task scheduler
313     if(m_tbb_tp)
314     {
315         m_tbb_tp                               = true;
316         m_pool_size                            = proposed_size;
317         tbb_global_control_t*& _global_control = tbb_global_control();
318         // delete if wrong size
319         if(m_pool_size != proposed_size)
320         {
321             delete _global_control;
322             _global_control = nullptr;
323         }
324 
325         if(!_global_control)
326         {
327             _global_control = new tbb_global_control_t(
328                 tbb::global_control::max_allowed_parallelism, proposed_size + 1);
329             if(m_verbose > 0)
330             {
331                 AutoLock lock(TypeMutex<decltype(std::cerr)>());
332                 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] initialized with "
333                           << m_pool_size << " threads." << std::endl;
334             }
335         }
336 
337         // create task group (used for async)
338         if(!m_tbb_task_group)
339         {
340             m_tbb_task_group = new tbb_task_group_t{};
341             execute_on_all_threads([this]() { m_init_func(); });
342         }
343 
344         return m_pool_size;
345     }
346 #endif
347 
348     m_alive_flag->store(true);
349 
350     //--------------------------------------------------------------------//
351     // if started, stop some thread if smaller or return if equal
352     if(m_pool_state->load() == thread_pool::state::STARTED)
353     {
354         if(m_pool_size > proposed_size)
355         {
356             while(stop_thread() > proposed_size)
357                 ;
358             if(m_verbose > 0)
359             {
360                 AutoLock lock(TypeMutex<decltype(std::cerr)>());
361                 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with "
362                           << m_pool_size << " threads." << std::endl;
363             }
364             if(!m_task_queue)
365             {
366                 m_delete_task_queue = true;
367                 m_task_queue        = new UserTaskQueue(m_pool_size);
368             }
369             else
370             {
371                 m_task_queue->resize(m_pool_size);
372             }
373             return m_pool_size;
374         }
375         else if(m_pool_size == proposed_size)  // NOLINT
376         {
377             if(m_verbose > 0)
378             {
379                 AutoLock lock(TypeMutex<decltype(std::cerr)>());
380                 std::cerr << "ThreadPool initialized with " << m_pool_size << " threads."
381                           << std::endl;
382             }
383             if(!m_task_queue)
384             {
385                 m_delete_task_queue = true;
386                 m_task_queue        = new UserTaskQueue(m_pool_size);
387             }
388             return m_pool_size;
389         }
390     }
391 
392     //--------------------------------------------------------------------//
393     // reserve enough space to prevent realloc later
394     {
395         AutoLock _task_lock(*m_task_lock);
396         m_is_joined.reserve(proposed_size);
397     }
398 
399     if(!m_task_queue)
400     {
401         m_delete_task_queue = true;
402         m_task_queue        = new UserTaskQueue(proposed_size);
403     }
404 
405     auto this_tid = get_this_thread_id();
406     for(size_type i = m_pool_size; i < proposed_size; ++i)
407     {
408         // add the threads
409         try
410         {
411             // create thread
412             Thread thr{ ThreadPool::start_thread, this, &m_thread_data,
413                         this_tid + i + 1 };
414             // only reaches here if successful creation of thread
415             ++m_pool_size;
416             // store thread
417             m_main_threads.push_back(thr.get_id());
418             // list of joined thread booleans
419             m_is_joined.push_back(false);
420             // set the affinity
421             if(m_use_affinity)
422                 set_affinity(i, thr);
423             set_priority(m_priority, thr);
424             // store
425             m_threads.emplace_back(std::move(thr));
426         } catch(std::runtime_error& e)
427         {
428             AutoLock lock(TypeMutex<decltype(std::cerr)>());
429             std::cerr << "[PTL::ThreadPool] " << e.what()
430                       << std::endl;  // issue creating thread
431             continue;
432         } catch(std::bad_alloc& e)
433         {
434             AutoLock lock(TypeMutex<decltype(std::cerr)>());
435             std::cerr << "[PTL::ThreadPool] " << e.what() << std::endl;
436             continue;
437         }
438     }
439     //------------------------------------------------------------------------//
440 
441     AutoLock _task_lock(*m_task_lock);
442 
443     // thread pool size doesn't match with join vector
444     // this will screw up joining later
445     if(m_is_joined.size() != m_main_threads.size())
446     {
447         std::stringstream ss;
448         ss << "ThreadPool::initialize_threadpool - boolean is_joined vector "
449            << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
450            << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
451 
452         throw std::runtime_error(ss.str());
453     }
454 
455     if(m_verbose > 0)
456     {
457         AutoLock lock(TypeMutex<decltype(std::cerr)>());
458         std::cerr << "[PTL::ThreadPool] ThreadPool initialized with " << m_pool_size
459                   << " threads." << std::endl;
460     }
461 
462     return m_main_threads.size();
463 }
464 
465 //======================================================================================//
466 
467 ThreadPool::size_type
468 ThreadPool::destroy_threadpool()
469 {
470     // Note: this is not for synchronization, its for thread communication!
471     // destroy_threadpool() will only be called from the main thread, yet
472     // the modified m_pool_state may not show up to other threads until its
473     // modified in a lock!
474     //------------------------------------------------------------------------//
475     m_pool_state->store(thread_pool::state::STOPPED);
476 
477     //--------------------------------------------------------------------//
478     // handle tbb task scheduler
479 #if defined(PTL_USE_TBB)
480     if(m_tbb_task_group)
481     {
482         execute_on_all_threads([this]() { m_fini_func(); });
483         auto _func = [&]() { m_tbb_task_group->wait(); };
484         if(m_tbb_task_arena)
485             m_tbb_task_arena->execute(_func);
486         else
487             _func();
488         delete m_tbb_task_group;
489         m_tbb_task_group = nullptr;
490     }
491     if(m_tbb_task_arena)
492     {
493         delete m_tbb_task_arena;
494         m_tbb_task_arena = nullptr;
495     }
496     if(m_tbb_tp && tbb_global_control())
497     {
498         tbb_global_control_t*& _global_control = tbb_global_control();
499         delete _global_control;
500         _global_control = nullptr;
501         m_tbb_tp        = false;
502         AutoLock lock(TypeMutex<decltype(std::cerr)>());
503         if(m_verbose > 0)
504         {
505             std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] destroyed" << std::endl;
506         }
507     }
508 #endif
509 
510     if(!m_alive_flag->load())
511         return 0;
512 
513     //------------------------------------------------------------------------//
514     // notify all threads we are shutting down
515     m_task_lock->lock();
516     m_task_cond->notify_all();
517     m_task_lock->unlock();
518     //------------------------------------------------------------------------//
519 
520     if(m_is_joined.size() != m_main_threads.size())
521     {
522         std::stringstream ss;
523         ss << "   ThreadPool::destroy_thread_pool - boolean is_joined vector "
524            << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
525            << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
526 
527         throw std::runtime_error(ss.str());
528     }
529 
530     for(size_type i = 0; i < m_is_joined.size(); i++)
531     {
532         //--------------------------------------------------------------------//
533         //
534         if(i < m_threads.size())
535             m_threads.at(i).join();
536 
537         //--------------------------------------------------------------------//
538         // if its joined already, nothing else needs to be done
539         if(m_is_joined.at(i))
540             continue;
541 
542         //--------------------------------------------------------------------//
543         // join
544         if(std::this_thread::get_id() == m_main_threads[i])
545             continue;
546 
547         //--------------------------------------------------------------------//
548         // thread id and index
549         auto _tid = m_main_threads[i];
550 
551         //--------------------------------------------------------------------//
552         // erase thread from thread ID list
553         if(f_thread_ids().find(_tid) != f_thread_ids().end())
554             f_thread_ids().erase(f_thread_ids().find(_tid));
555 
556         //--------------------------------------------------------------------//
557         // it's joined
558         m_is_joined.at(i) = true;
559     }
560 
561     m_thread_data.clear();
562     m_threads.clear();
563     m_main_threads.clear();
564     m_is_joined.clear();
565 
566     m_alive_flag->store(false);
567 
568     auto start   = std::chrono::steady_clock::now();
569     auto elapsed = std::chrono::duration<double>{};
570     // wait maximum of 30 seconds for threads to exit
571     while(m_thread_active->load() > 0 && elapsed.count() < 30)
572     {
573         std::this_thread::sleep_for(std::chrono::milliseconds(50));
574         elapsed = std::chrono::steady_clock::now() - start;
575     }
576 
577     auto _active = m_thread_active->load();
578 
579     if(get_verbose() > 0)
580     {
581         if(_active == 0)
582         {
583             AutoLock lock(TypeMutex<decltype(std::cerr)>());
584             std::cerr << "[PTL::ThreadPool] ThreadPool destroyed" << std::endl;
585         }
586         else
587         {
588             AutoLock lock(TypeMutex<decltype(std::cerr)>());
589             std::cerr << "[PTL::ThreadPool] ThreadPool destroyed but " << _active
590                       << " threads might still be active (and cause a termination error)"
591                       << std::endl;
592         }
593     }
594 
595     if(m_delete_task_queue)
596     {
597         delete m_task_queue;
598         m_task_queue = nullptr;
599     }
600 
601     return 0;
602 }
603 
604 //======================================================================================//
605 
606 ThreadPool::size_type
607 ThreadPool::stop_thread()
608 {
609     if(!m_alive_flag->load() || m_pool_size == 0)
610         return 0;
611 
612     m_pool_state->store(thread_pool::state::PARTIAL);
613 
614     //------------------------------------------------------------------------//
615     // notify all threads we are shutting down
616     m_task_lock->lock();
617     m_is_stopped.push_back(true);
618     m_task_cond->notify_one();
619     m_task_lock->unlock();
620     //------------------------------------------------------------------------//
621 
622     while(!m_is_stopped.empty() && m_stop_threads.empty())
623         ;
624 
625     // lock up the task queue
626     AutoLock _task_lock(*m_task_lock);
627 
628     while(!m_stop_threads.empty())
629     {
630         auto tid = m_stop_threads.front();
631         // remove from stopped
632         m_stop_threads.pop_front();
633         // remove from main
634         for(auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
635         {
636             if(*itr == tid)
637             {
638                 m_main_threads.erase(itr);
639                 break;
640             }
641         }
642         // remove from join list
643         m_is_joined.pop_back();
644     }
645 
646     m_pool_state->store(thread_pool::state::STARTED);
647 
648     m_pool_size = m_main_threads.size();
649     return m_main_threads.size();
650 }
651 
652 //======================================================================================//
653 
654 ThreadPool::task_queue_t*&
655 ThreadPool::get_valid_queue(task_queue_t*& _queue) const
656 {
657     if(!_queue)
658         _queue = new UserTaskQueue{ static_cast<intmax_t>(m_pool_size) };
659     return _queue;
660 }
661 //======================================================================================//
662 
663 // Temporary workaround for shared_ptr constructor GPFLT on Intel Macs and Clang 15
664 #if defined (__APPLE__) && defined(__amd64) && defined(__clang__)
665 [[clang::optnone]]
666 #endif
667 void
668 ThreadPool::execute_thread(VUserTaskQueue* _task_queue)
669 {
670     ++(*m_thread_awake);
671 
672     // initialization function
673     m_init_func();
674     // finalization function (executed when scope is destroyed)
675     ScopeDestructor _fini{ [this]() { m_fini_func(); } };
676 
677     ThreadId    tid  = ThisThread::get_id();
678     ThreadData* data = thread_data();
679     // auto        thread_bin = _task_queue->GetThreadBin();
680     // auto        workers    = _task_queue->workers();
681 
682     auto start   = std::chrono::steady_clock::now();
683     auto elapsed = std::chrono::duration<double>{};
684     // check for updates for 60 seconds max
685     while(!_task_queue && elapsed.count() < 60)
686     {
687         elapsed = std::chrono::steady_clock::now() - start;
688         data->update();
689         _task_queue = data->current_queue;
690     }
691 
692     if(!_task_queue)
693     {
694         --(*m_thread_awake);
695         throw std::runtime_error("No task queue was found after 60 seconds!");
696     }
697 
698     assert(data->current_queue != nullptr);
699     assert(_task_queue == data->current_queue);
700 
701     // essentially a dummy run
702     if(_task_queue)
703     {
704         data->within_task = true;
705         auto _task        = _task_queue->GetTask();
706         if(_task)
707         {
708             (*_task)();
709         }
710         data->within_task = false;
711     }
712 
713     // threads stay in this loop forever until thread-pool destroyed
714     while(true)
715     {
716         static thread_local auto p_task_lock = m_task_lock;
717 
718         //--------------------------------------------------------------------//
719         // Try to pick a task
720         AutoLock _task_lock(*p_task_lock, std::defer_lock);
721         //--------------------------------------------------------------------//
722 
723         auto leave_pool = [&]() {
724             auto _state      = [&]() { return static_cast<int>(m_pool_state->load()); };
725             auto _pool_state = _state();
726             if(_pool_state > 0)
727             {
728                 // stop whole pool
729                 if(_pool_state == thread_pool::state::STOPPED)
730                 {
731                     if(_task_lock.owns_lock())
732                         _task_lock.unlock();
733                     return true;
734                 }
735                 // single thread stoppage
736                 else if(_pool_state == thread_pool::state::PARTIAL)  // NOLINT
737                 {
738                     if(!_task_lock.owns_lock())
739                         _task_lock.lock();
740                     if(!m_is_stopped.empty() && m_is_stopped.back())
741                     {
742                         m_stop_threads.push_back(tid);
743                         m_is_stopped.pop_back();
744                         if(_task_lock.owns_lock())
745                             _task_lock.unlock();
746                         // exit entire function
747                         return true;
748                     }
749                     if(_task_lock.owns_lock())
750                         _task_lock.unlock();
751                 }
752             }
753             return false;
754         };
755 
756         // We need to put condition.wait() in a loop for two reasons:
757         // 1. There can be spurious wake-ups (due to signal/ENITR)
758         // 2. When mutex is released for waiting, another thread can be woken up
759         //    from a signal/broadcast and that thread can mess up the condition.
760         //    So when the current thread wakes up the condition may no longer be
761         //    actually true!
762         while(_task_queue->empty())
763         {
764             auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
765             auto _size  = [&]() { return _task_queue->true_size(); };
766             auto _empty = [&]() { return _task_queue->empty(); };
767             auto _wake  = [&]() { return (!_empty() || _size() > 0 || _state() > 0); };
768 
769             if(leave_pool())
770                 return;
771 
772             if(_task_queue->true_size() == 0)
773             {
774                 if(m_thread_awake->load() > 0)
775                     --(*m_thread_awake);
776 
777                 // lock before sleeping on condition
778                 if(!_task_lock.owns_lock())
779                     _task_lock.lock();
780 
781                 // Wait until there is a task in the queue
782                 // Unlocks mutex while waiting, then locks it back when signaled
783                 // use lambda to control waking
784                 m_task_cond->wait(_task_lock, _wake);
785 
786                 if(_state() == thread_pool::state::STOPPED)
787                     return;
788 
789                 // unlock if owned
790                 if(_task_lock.owns_lock())
791                     _task_lock.unlock();
792 
793                 // notify that is awake
794                 if(m_thread_awake->load() < m_pool_size)
795                     ++(*m_thread_awake);
796             }
797             else
798                 break;
799         }
800 
801         // release the lock
802         if(_task_lock.owns_lock())
803             _task_lock.unlock();
804 
805         //----------------------------------------------------------------//
806 
807         // leave pool if conditions dictate it
808         if(leave_pool())
809             return;
810 
811         // activate guard against recursive deadlock
812         data->within_task = true;
813         //----------------------------------------------------------------//
814 
815         // execute the task(s)
816         while(!_task_queue->empty())
817         {
818             auto _task = _task_queue->GetTask();
819             if(_task)
820             {
821                 (*_task)();
822             }
823         }
824         //----------------------------------------------------------------//
825 
826         // disable guard against recursive deadlock
827         data->within_task = false;
828         //----------------------------------------------------------------//
829     }
830 }
831 
832 //======================================================================================//
833 
834 }  // namespace PTL
835