Geant4 Cross Reference

Cross-Referencing   Geant4
Geant4/externals/ptl/src/ThreadPool.cc

Version: [ ReleaseNotes ] [ 1.0 ] [ 1.1 ] [ 2.0 ] [ 3.0 ] [ 3.1 ] [ 3.2 ] [ 4.0 ] [ 4.0.p1 ] [ 4.0.p2 ] [ 4.1 ] [ 4.1.p1 ] [ 5.0 ] [ 5.0.p1 ] [ 5.1 ] [ 5.1.p1 ] [ 5.2 ] [ 5.2.p1 ] [ 5.2.p2 ] [ 6.0 ] [ 6.0.p1 ] [ 6.1 ] [ 6.2 ] [ 6.2.p1 ] [ 6.2.p2 ] [ 7.0 ] [ 7.0.p1 ] [ 7.1 ] [ 7.1.p1 ] [ 8.0 ] [ 8.0.p1 ] [ 8.1 ] [ 8.1.p1 ] [ 8.1.p2 ] [ 8.2 ] [ 8.2.p1 ] [ 8.3 ] [ 8.3.p1 ] [ 8.3.p2 ] [ 9.0 ] [ 9.0.p1 ] [ 9.0.p2 ] [ 9.1 ] [ 9.1.p1 ] [ 9.1.p2 ] [ 9.1.p3 ] [ 9.2 ] [ 9.2.p1 ] [ 9.2.p2 ] [ 9.2.p3 ] [ 9.2.p4 ] [ 9.3 ] [ 9.3.p1 ] [ 9.3.p2 ] [ 9.4 ] [ 9.4.p1 ] [ 9.4.p2 ] [ 9.4.p3 ] [ 9.4.p4 ] [ 9.5 ] [ 9.5.p1 ] [ 9.5.p2 ] [ 9.6 ] [ 9.6.p1 ] [ 9.6.p2 ] [ 9.6.p3 ] [ 9.6.p4 ] [ 10.0 ] [ 10.0.p1 ] [ 10.0.p2 ] [ 10.0.p3 ] [ 10.0.p4 ] [ 10.1 ] [ 10.1.p1 ] [ 10.1.p2 ] [ 10.1.p3 ] [ 10.2 ] [ 10.2.p1 ] [ 10.2.p2 ] [ 10.2.p3 ] [ 10.3 ] [ 10.3.p1 ] [ 10.3.p2 ] [ 10.3.p3 ] [ 10.4 ] [ 10.4.p1 ] [ 10.4.p2 ] [ 10.4.p3 ] [ 10.5 ] [ 10.5.p1 ] [ 10.6 ] [ 10.6.p1 ] [ 10.6.p2 ] [ 10.6.p3 ] [ 10.7 ] [ 10.7.p1 ] [ 10.7.p2 ] [ 10.7.p3 ] [ 10.7.p4 ] [ 11.0 ] [ 11.0.p1 ] [ 11.0.p2 ] [ 11.0.p3, ] [ 11.0.p4 ] [ 11.1 ] [ 11.1.1 ] [ 11.1.2 ] [ 11.1.3 ] [ 11.2 ] [ 11.2.1 ] [ 11.2.2 ] [ 11.3.0 ]

Diff markup

Differences between /externals/ptl/src/ThreadPool.cc (Version 11.3.0) and /externals/ptl/src/ThreadPool.cc (Version 9.1.p2)


  1 //                                                  1 
  2 // MIT License                                    
  3 // Copyright (c) 2020 Jonathan R. Madsen          
  4 // Permission is hereby granted, free of charg    
  5 // of this software and associated documentati    
  6 // in the Software without restriction, includ    
  7 // to use, copy, modify, merge, publish, distr    
  8 // copies of the Software, and to permit perso    
  9 // furnished to do so, subject to the followin    
 10 // The above copyright notice and this permiss    
 11 // all copies or substantial portions of the S    
 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR    
 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT    
 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH    
 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR    
 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS    
 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI    
 18 //                                                
 19 // -------------------------------------------    
 20 //  Tasking class implementation                  
 21 //                                                
 22 // Class Description:                             
 23 //                                                
 24 // This file creates a class for an efficient     
 25 // accepts work in the form of tasks.             
 26 //                                                
 27 // -------------------------------------------    
 28 // Author: Jonathan Madsen (Feb 13th 2018)        
 29 // -------------------------------------------    
 30                                                   
 31 #include "PTL/ThreadPool.hh"                      
 32 #include "PTL/GetEnv.hh"                          
 33 #include "PTL/ScopeDestructor.hh"                 
 34 #include "PTL/ThreadData.hh"                      
 35 #include "PTL/Threading.hh"                       
 36 #include "PTL/UserTaskQueue.hh"                   
 37 #include "PTL/VUserTaskQueue.hh"                  
 38                                                   
 39 #include <cassert>                                
 40 #include <mutex>                                  
 41 #include <new>                                    
 42 #include <stdexcept>                              
 43 #include <thread>                                 
 44                                                   
 45 //============================================    
 46                                                   
 47 namespace                                         
 48 {                                                 
 49 PTL::ThreadData*&                                 
 50 thread_data()                                     
 51 {                                                 
 52     return PTL::ThreadData::GetInstance();        
 53 }                                                 
 54 }  // namespace                                   
 55                                                   
 56 namespace PTL                                     
 57 {                                                 
 58 //============================================    
 59                                                   
 60 ThreadPool::thread_id_map_t&                      
 61 ThreadPool::f_thread_ids()                        
 62 {                                                 
 63     static auto _v = thread_id_map_t{};           
 64     return _v;                                    
 65 }                                                 
 66                                                   
 67 //============================================    
 68                                                   
 69 ThreadPool::size_type&                            
 70 ThreadPool::f_default_pool_size()                 
 71 {                                                 
 72     static size_type _v =                         
 73         GetEnv<size_type>("PTL_NUM_THREADS", T    
 74     return _v;                                    
 75 }                                                 
 76                                                   
 77 //============================================    
 78 // static member function that calls the membe    
 79 // run                                            
 80 void                                              
 81 ThreadPool::start_thread(ThreadPool* tp, threa    
 82 {                                                 
 83     if(tp->get_verbose() > 0)                     
 84     {                                             
 85         AutoLock lock(TypeMutex<decltype(std::    
 86         std::cerr << "[PTL::ThreadPool] Starti    
 87     }                                             
 88                                                   
 89     auto _thr_data = std::make_shared<ThreadDa    
 90     {                                             
 91         AutoLock lock(TypeMutex<ThreadPool>(),    
 92         if(!lock.owns_lock())                     
 93             lock.lock();                          
 94         if(_idx < 0)                              
 95             _idx = f_thread_ids().size();         
 96         f_thread_ids()[std::this_thread::get_i    
 97         SetThreadId((int)_idx);                   
 98         _data->emplace_back(_thr_data);           
 99     }                                             
100     thread_data() = _thr_data.get();              
101     tp->record_entry();                           
102     tp->execute_thread(thread_data()->current_    
103     tp->record_exit();                            
104                                                   
105     if(tp->get_verbose() > 0)                     
106     {                                             
107         AutoLock lock(TypeMutex<decltype(std::    
108         std::cerr << "[PTL::ThreadPool] Thread    
109                   << std::endl;                   
110     }                                             
111 }                                                 
112                                                   
113 //============================================    
114                                                   
115 const ThreadPool::thread_id_map_t&                
116 ThreadPool::get_thread_ids()                      
117 {                                                 
118     return f_thread_ids();                        
119 }                                                 
120                                                   
121 //============================================    
122                                                   
123 uintmax_t                                         
124 ThreadPool::get_thread_id(ThreadId _tid)          
125 {                                                 
126     uintmax_t _idx = 0;                           
127     {                                             
128         AutoLock lock(TypeMutex<ThreadPool>(),    
129         if(!lock.owns_lock())                     
130             lock.lock();                          
131         auto itr = f_thread_ids().find(_tid);     
132         if(itr == f_thread_ids().end())           
133         {                                         
134             _idx                 = f_thread_id    
135             f_thread_ids()[_tid] = _idx;          
136         }                                         
137         else                                      
138         {                                         
139             _idx = itr->second;                   
140         }                                         
141     }                                             
142     return _idx;                                  
143 }                                                 
144                                                   
145 //============================================    
146                                                   
147 uintmax_t                                         
148 ThreadPool::get_this_thread_id()                  
149 {                                                 
150     return get_thread_id(ThisThread::get_id())    
151 }                                                 
152                                                   
153 //============================================    
154                                                   
155 uintmax_t                                         
156 ThreadPool::add_thread_id(ThreadId _tid)          
157 {                                                 
158     AutoLock lock(TypeMutex<ThreadPool>(), std    
159     if(!lock.owns_lock())                         
160         lock.lock();                              
161     if(f_thread_ids().find(_tid) == f_thread_i    
162     {                                             
163         auto _idx            = f_thread_ids().    
164         f_thread_ids()[_tid] = _idx;              
165         SetThreadId((int)_idx);                   
166     }                                             
167     return f_thread_ids().at(_tid);               
168 }                                                 
169                                                   
170 //============================================    
171                                                   
172 ThreadPool::ThreadPool(const Config& _cfg)        
173 : m_use_affinity{ _cfg.use_affinity }             
174 , m_tbb_tp{ _cfg.use_tbb }                        
175 , m_verbose{ _cfg.verbose }                       
176 , m_priority{ _cfg.priority }                     
177 , m_pool_state{ std::make_shared<std::atomic_s    
178 , m_task_queue{ _cfg.task_queue }                 
179 , m_init_func{ _cfg.initializer }                 
180 , m_fini_func{ _cfg.finalizer }                   
181 , m_affinity_func{ _cfg.set_affinity }            
182 {                                                 
183     auto master_id = get_this_thread_id();        
184     if(master_id != 0 && m_verbose > 1)           
185     {                                             
186         AutoLock lock(TypeMutex<decltype(std::    
187         std::cerr << "[PTL::ThreadPool] Thread    
188     }                                             
189                                                   
190     thread_data() = new ThreadData(this);         
191                                                   
192     // initialize after get_this_thread_id so     
193     if(_cfg.init)                                 
194         this->initialize_threadpool(_cfg.pool_    
195 }                                                 
196                                                   
197 //============================================    
198                                                   
199 ThreadPool::~ThreadPool()                         
200 {                                                 
201     if(m_alive_flag->load())                      
202     {                                             
203         std::cerr << "Warning! ThreadPool was     
204                      "destroy_threadpool() bef    
205                      "eliminate this message."    
206                   << std::endl;                   
207         m_pool_state->store(thread_pool::state    
208         m_task_lock->lock();                      
209         m_task_cond->notify_all();                
210         m_task_lock->unlock();                    
211         for(auto& itr : m_threads)                
212             itr.join();                           
213         m_threads.clear();                        
214     }                                             
215                                                   
216     // delete owned resources                     
217     if(m_delete_task_queue)                       
218         delete m_task_queue;                      
219                                                   
220     delete m_tbb_task_arena;                      
221     delete m_tbb_task_group;                      
222 }                                                 
223                                                   
224 //============================================    
225                                                   
226 bool                                              
227 ThreadPool::is_initialized() const                
228 {                                                 
229     return !(m_pool_state->load() == thread_po    
230 }                                                 
231                                                   
232 //============================================    
233                                                   
234 void                                              
235 ThreadPool::record_entry()                        
236 {                                                 
237     ++(*m_thread_active);                         
238 }                                                 
239                                                   
240 //============================================    
241                                                   
242 void                                              
243 ThreadPool::record_exit()                         
244 {                                                 
245     --(*m_thread_active);                         
246 }                                                 
247                                                   
248 //============================================    
249                                                   
250 void                                              
251 ThreadPool::set_affinity(intmax_t i, Thread& _    
252 {                                                 
253     try                                           
254     {                                             
255         NativeThread native_thread = _thread.n    
256         intmax_t     _pin          = m_affinit    
257         if(m_verbose > 0)                         
258         {                                         
259             AutoLock lock(TypeMutex<decltype(s    
260             std::cerr << "[PTL::ThreadPool] Se    
261                       << get_thread_id(_thread    
262         }                                         
263         SetPinAffinity((int)_pin, native_threa    
264     } catch(std::runtime_error& e)                
265     {                                             
266         std::cerr << "[PTL::ThreadPool] Error     
267                   << std::endl;                   
268     }                                             
269 }                                                 
270                                                   
271 //============================================    
272                                                   
273 void                                              
274 ThreadPool::set_priority(int _prio, Thread& _t    
275 {                                                 
276     try                                           
277     {                                             
278         NativeThread native_thread = _thread.n    
279         if(m_verbose > 0)                         
280         {                                         
281             AutoLock lock(TypeMutex<decltype(s    
282             std::cerr << "[PTL::ThreadPool] Se    
283                       << get_thread_id(_thread    
284                       << std::endl;               
285         }                                         
286         SetThreadPriority(_prio, native_thread    
287     } catch(std::runtime_error& e)                
288     {                                             
289         AutoLock lock(TypeMutex<decltype(std::    
290         std::cerr << "[PTL::ThreadPool] Error     
291                   << std::endl;                   
292     }                                             
293 }                                                 
294                                                   
295 //============================================    
296                                                   
297 ThreadPool::size_type                             
298 ThreadPool::initialize_threadpool(size_type pr    
299 {                                                 
300     //----------------------------------------    
301     // return before initializing                 
302     if(proposed_size < 1)                         
303         return 0;                                 
304                                                   
305     //----------------------------------------    
306     // store that has been started                
307     if(!m_alive_flag->load())                     
308         m_pool_state->store(thread_pool::state    
309                                                   
310 #if defined(PTL_USE_TBB)                          
311     //----------------------------------------    
312     // handle tbb task scheduler                  
313     if(m_tbb_tp)                                  
314     {                                             
315         m_tbb_tp                                  
316         m_pool_size                               
317         tbb_global_control_t*& _global_control    
318         // delete if wrong size                   
319         if(m_pool_size != proposed_size)          
320         {                                         
321             delete _global_control;               
322             _global_control = nullptr;            
323         }                                         
324                                                   
325         if(!_global_control)                      
326         {                                         
327             _global_control = new tbb_global_c    
328                 tbb::global_control::max_allow    
329             if(m_verbose > 0)                     
330             {                                     
331                 AutoLock lock(TypeMutex<declty    
332                 std::cerr << "[PTL::ThreadPool    
333                           << m_pool_size << "     
334             }                                     
335         }                                         
336                                                   
337         // create task group (used for async)     
338         if(!m_tbb_task_group)                     
339         {                                         
340             m_tbb_task_group = new tbb_task_gr    
341             execute_on_all_threads([this]() {     
342         }                                         
343                                                   
344         return m_pool_size;                       
345     }                                             
346 #endif                                            
347                                                   
348     m_alive_flag->store(true);                    
349                                                   
350     //----------------------------------------    
351     // if started, stop some thread if smaller    
352     if(m_pool_state->load() == thread_pool::st    
353     {                                             
354         if(m_pool_size > proposed_size)           
355         {                                         
356             while(stop_thread() > proposed_siz    
357                 ;                                 
358             if(m_verbose > 0)                     
359             {                                     
360                 AutoLock lock(TypeMutex<declty    
361                 std::cerr << "[PTL::ThreadPool    
362                           << m_pool_size << "     
363             }                                     
364             if(!m_task_queue)                     
365             {                                     
366                 m_delete_task_queue = true;       
367                 m_task_queue        = new User    
368             }                                     
369             else                                  
370             {                                     
371                 m_task_queue->resize(m_pool_si    
372             }                                     
373             return m_pool_size;                   
374         }                                         
375         else if(m_pool_size == proposed_size)     
376         {                                         
377             if(m_verbose > 0)                     
378             {                                     
379                 AutoLock lock(TypeMutex<declty    
380                 std::cerr << "ThreadPool initi    
381                           << std::endl;           
382             }                                     
383             if(!m_task_queue)                     
384             {                                     
385                 m_delete_task_queue = true;       
386                 m_task_queue        = new User    
387             }                                     
388             return m_pool_size;                   
389         }                                         
390     }                                             
391                                                   
392     //----------------------------------------    
393     // reserve enough space to prevent realloc    
394     {                                             
395         AutoLock _task_lock(*m_task_lock);        
396         m_is_joined.reserve(proposed_size);       
397     }                                             
398                                                   
399     if(!m_task_queue)                             
400     {                                             
401         m_delete_task_queue = true;               
402         m_task_queue        = new UserTaskQueu    
403     }                                             
404                                                   
405     auto this_tid = get_this_thread_id();         
406     for(size_type i = m_pool_size; i < propose    
407     {                                             
408         // add the threads                        
409         try                                       
410         {                                         
411             // create thread                      
412             Thread thr{ ThreadPool::start_thre    
413                         this_tid + i + 1 };       
414             // only reaches here if successful    
415             ++m_pool_size;                        
416             // store thread                       
417             m_main_threads.push_back(thr.get_i    
418             // list of joined thread booleans     
419             m_is_joined.push_back(false);         
420             // set the affinity                   
421             if(m_use_affinity)                    
422                 set_affinity(i, thr);             
423             set_priority(m_priority, thr);        
424             // store                              
425             m_threads.emplace_back(std::move(t    
426         } catch(std::runtime_error& e)            
427         {                                         
428             AutoLock lock(TypeMutex<decltype(s    
429             std::cerr << "[PTL::ThreadPool] "     
430                       << std::endl;  // issue     
431             continue;                             
432         } catch(std::bad_alloc& e)                
433         {                                         
434             AutoLock lock(TypeMutex<decltype(s    
435             std::cerr << "[PTL::ThreadPool] "     
436             continue;                             
437         }                                         
438     }                                             
439     //----------------------------------------    
440                                                   
441     AutoLock _task_lock(*m_task_lock);            
442                                                   
443     // thread pool size doesn't match with joi    
444     // this will screw up joining later           
445     if(m_is_joined.size() != m_main_threads.si    
446     {                                             
447         std::stringstream ss;                     
448         ss << "ThreadPool::initialize_threadpo    
449            << "is a different size than thread    
450            << m_main_threads.size() << " (tid:    
451                                                   
452         throw std::runtime_error(ss.str());       
453     }                                             
454                                                   
455     if(m_verbose > 0)                             
456     {                                             
457         AutoLock lock(TypeMutex<decltype(std::    
458         std::cerr << "[PTL::ThreadPool] Thread    
459                   << " threads." << std::endl;    
460     }                                             
461                                                   
462     return m_main_threads.size();                 
463 }                                                 
464                                                   
465 //============================================    
466                                                   
467 ThreadPool::size_type                             
468 ThreadPool::destroy_threadpool()                  
469 {                                                 
470     // Note: this is not for synchronization,     
471     // destroy_threadpool() will only be calle    
472     // the modified m_pool_state may not show     
473     // modified in a lock!                        
474     //----------------------------------------    
475     m_pool_state->store(thread_pool::state::ST    
476                                                   
477     //----------------------------------------    
478     // handle tbb task scheduler                  
479 #if defined(PTL_USE_TBB)                          
480     if(m_tbb_task_group)                          
481     {                                             
482         execute_on_all_threads([this]() { m_fi    
483         auto _func = [&]() { m_tbb_task_group-    
484         if(m_tbb_task_arena)                      
485             m_tbb_task_arena->execute(_func);     
486         else                                      
487             _func();                              
488         delete m_tbb_task_group;                  
489         m_tbb_task_group = nullptr;               
490     }                                             
491     if(m_tbb_task_arena)                          
492     {                                             
493         delete m_tbb_task_arena;                  
494         m_tbb_task_arena = nullptr;               
495     }                                             
496     if(m_tbb_tp && tbb_global_control())          
497     {                                             
498         tbb_global_control_t*& _global_control    
499         delete _global_control;                   
500         _global_control = nullptr;                
501         m_tbb_tp        = false;                  
502         AutoLock lock(TypeMutex<decltype(std::    
503         if(m_verbose > 0)                         
504         {                                         
505             std::cerr << "[PTL::ThreadPool] Th    
506         }                                         
507     }                                             
508 #endif                                            
509                                                   
510     if(!m_alive_flag->load())                     
511         return 0;                                 
512                                                   
513     //----------------------------------------    
514     // notify all threads we are shutting down    
515     m_task_lock->lock();                          
516     m_task_cond->notify_all();                    
517     m_task_lock->unlock();                        
518     //----------------------------------------    
519                                                   
520     if(m_is_joined.size() != m_main_threads.si    
521     {                                             
522         std::stringstream ss;                     
523         ss << "   ThreadPool::destroy_thread_p    
524            << "is a different size than thread    
525            << m_main_threads.size() << " (tid:    
526                                                   
527         throw std::runtime_error(ss.str());       
528     }                                             
529                                                   
530     for(size_type i = 0; i < m_is_joined.size(    
531     {                                             
532         //------------------------------------    
533         //                                        
534         if(i < m_threads.size())                  
535             m_threads.at(i).join();               
536                                                   
537         //------------------------------------    
538         // if its joined already, nothing else    
539         if(m_is_joined.at(i))                     
540             continue;                             
541                                                   
542         //------------------------------------    
543         // join                                   
544         if(std::this_thread::get_id() == m_mai    
545             continue;                             
546                                                   
547         //------------------------------------    
548         // thread id and index                    
549         auto _tid = m_main_threads[i];            
550                                                   
551         //------------------------------------    
552         // erase thread from thread ID list       
553         if(f_thread_ids().find(_tid) != f_thre    
554             f_thread_ids().erase(f_thread_ids(    
555                                                   
556         //------------------------------------    
557         // it's joined                            
558         m_is_joined.at(i) = true;                 
559     }                                             
560                                                   
561     m_thread_data.clear();                        
562     m_threads.clear();                            
563     m_main_threads.clear();                       
564     m_is_joined.clear();                          
565                                                   
566     m_alive_flag->store(false);                   
567                                                   
568     auto start   = std::chrono::steady_clock::    
569     auto elapsed = std::chrono::duration<doubl    
570     // wait maximum of 30 seconds for threads     
571     while(m_thread_active->load() > 0 && elaps    
572     {                                             
573         std::this_thread::sleep_for(std::chron    
574         elapsed = std::chrono::steady_clock::n    
575     }                                             
576                                                   
577     auto _active = m_thread_active->load();       
578                                                   
579     if(get_verbose() > 0)                         
580     {                                             
581         if(_active == 0)                          
582         {                                         
583             AutoLock lock(TypeMutex<decltype(s    
584             std::cerr << "[PTL::ThreadPool] Th    
585         }                                         
586         else                                      
587         {                                         
588             AutoLock lock(TypeMutex<decltype(s    
589             std::cerr << "[PTL::ThreadPool] Th    
590                       << " threads might still    
591                       << std::endl;               
592         }                                         
593     }                                             
594                                                   
595     if(m_delete_task_queue)                       
596     {                                             
597         delete m_task_queue;                      
598         m_task_queue = nullptr;                   
599     }                                             
600                                                   
601     return 0;                                     
602 }                                                 
603                                                   
604 //============================================    
605                                                   
606 ThreadPool::size_type                             
607 ThreadPool::stop_thread()                         
608 {                                                 
609     if(!m_alive_flag->load() || m_pool_size ==    
610         return 0;                                 
611                                                   
612     m_pool_state->store(thread_pool::state::PA    
613                                                   
614     //----------------------------------------    
615     // notify all threads we are shutting down    
616     m_task_lock->lock();                          
617     m_is_stopped.push_back(true);                 
618     m_task_cond->notify_one();                    
619     m_task_lock->unlock();                        
620     //----------------------------------------    
621                                                   
622     while(!m_is_stopped.empty() && m_stop_thre    
623         ;                                         
624                                                   
625     // lock up the task queue                     
626     AutoLock _task_lock(*m_task_lock);            
627                                                   
628     while(!m_stop_threads.empty())                
629     {                                             
630         auto tid = m_stop_threads.front();        
631         // remove from stopped                    
632         m_stop_threads.pop_front();               
633         // remove from main                       
634         for(auto itr = m_main_threads.begin();    
635         {                                         
636             if(*itr == tid)                       
637             {                                     
638                 m_main_threads.erase(itr);        
639                 break;                            
640             }                                     
641         }                                         
642         // remove from join list                  
643         m_is_joined.pop_back();                   
644     }                                             
645                                                   
646     m_pool_state->store(thread_pool::state::ST    
647                                                   
648     m_pool_size = m_main_threads.size();          
649     return m_main_threads.size();                 
650 }                                                 
651                                                   
652 //============================================    
653                                                   
654 ThreadPool::task_queue_t*&                        
655 ThreadPool::get_valid_queue(task_queue_t*& _qu    
656 {                                                 
657     if(!_queue)                                   
658         _queue = new UserTaskQueue{ static_cas    
659     return _queue;                                
660 }                                                 
661 //============================================    
662                                                   
663 // Temporary workaround for shared_ptr constru    
664 #if defined (__APPLE__) && defined(__amd64) &&    
665 [[clang::optnone]]                                
666 #endif                                            
667 void                                              
668 ThreadPool::execute_thread(VUserTaskQueue* _ta    
669 {                                                 
670     ++(*m_thread_awake);                          
671                                                   
672     // initialization function                    
673     m_init_func();                                
674     // finalization function (executed when sc    
675     ScopeDestructor _fini{ [this]() { m_fini_f    
676                                                   
677     ThreadId    tid  = ThisThread::get_id();      
678     ThreadData* data = thread_data();             
679     // auto        thread_bin = _task_queue->G    
680     // auto        workers    = _task_queue->w    
681                                                   
682     auto start   = std::chrono::steady_clock::    
683     auto elapsed = std::chrono::duration<doubl    
684     // check for updates for 60 seconds max       
685     while(!_task_queue && elapsed.count() < 60    
686     {                                             
687         elapsed = std::chrono::steady_clock::n    
688         data->update();                           
689         _task_queue = data->current_queue;        
690     }                                             
691                                                   
692     if(!_task_queue)                              
693     {                                             
694         --(*m_thread_awake);                      
695         throw std::runtime_error("No task queu    
696     }                                             
697                                                   
698     assert(data->current_queue != nullptr);       
699     assert(_task_queue == data->current_queue)    
700                                                   
701     // essentially a dummy run                    
702     if(_task_queue)                               
703     {                                             
704         data->within_task = true;                 
705         auto _task        = _task_queue->GetTa    
706         if(_task)                                 
707         {                                         
708             (*_task)();                           
709         }                                         
710         data->within_task = false;                
711     }                                             
712                                                   
713     // threads stay in this loop forever until    
714     while(true)                                   
715     {                                             
716         static thread_local auto p_task_lock =    
717                                                   
718         //------------------------------------    
719         // Try to pick a task                     
720         AutoLock _task_lock(*p_task_lock, std:    
721         //------------------------------------    
722                                                   
723         auto leave_pool = [&]() {                 
724             auto _state      = [&]() { return     
725             auto _pool_state = _state();          
726             if(_pool_state > 0)                   
727             {                                     
728                 // stop whole pool                
729                 if(_pool_state == thread_pool:    
730                 {                                 
731                     if(_task_lock.owns_lock())    
732                         _task_lock.unlock();      
733                     return true;                  
734                 }                                 
735                 // single thread stoppage         
736                 else if(_pool_state == thread_    
737                 {                                 
738                     if(!_task_lock.owns_lock()    
739                         _task_lock.lock();        
740                     if(!m_is_stopped.empty() &    
741                     {                             
742                         m_stop_threads.push_ba    
743                         m_is_stopped.pop_back(    
744                         if(_task_lock.owns_loc    
745                             _task_lock.unlock(    
746                         // exit entire functio    
747                         return true;              
748                     }                             
749                     if(_task_lock.owns_lock())    
750                         _task_lock.unlock();      
751                 }                                 
752             }                                     
753             return false;                         
754         };                                        
755                                                   
756         // We need to put condition.wait() in     
757         // 1. There can be spurious wake-ups (    
758         // 2. When mutex is released for waiti    
759         //    from a signal/broadcast and that    
760         //    So when the current thread wakes    
761         //    actually true!                      
762         while(_task_queue->empty())               
763         {                                         
764             auto _state = [&]() { return stati    
765             auto _size  = [&]() { return _task    
766             auto _empty = [&]() { return _task    
767             auto _wake  = [&]() { return (!_em    
768                                                   
769             if(leave_pool())                      
770                 return;                           
771                                                   
772             if(_task_queue->true_size() == 0)     
773             {                                     
774                 if(m_thread_awake->load() > 0)    
775                     --(*m_thread_awake);          
776                                                   
777                 // lock before sleeping on con    
778                 if(!_task_lock.owns_lock())       
779                     _task_lock.lock();            
780                                                   
781                 // Wait until there is a task     
782                 // Unlocks mutex while waiting    
783                 // use lambda to control wakin    
784                 m_task_cond->wait(_task_lock,     
785                                                   
786                 if(_state() == thread_pool::st    
787                     return;                       
788                                                   
789                 // unlock if owned                
790                 if(_task_lock.owns_lock())        
791                     _task_lock.unlock();          
792                                                   
793                 // notify that is awake           
794                 if(m_thread_awake->load() < m_    
795                     ++(*m_thread_awake);          
796             }                                     
797             else                                  
798                 break;                            
799         }                                         
800                                                   
801         // release the lock                       
802         if(_task_lock.owns_lock())                
803             _task_lock.unlock();                  
804                                                   
805         //------------------------------------    
806                                                   
807         // leave pool if conditions dictate it    
808         if(leave_pool())                          
809             return;                               
810                                                   
811         // activate guard against recursive de    
812         data->within_task = true;                 
813         //------------------------------------    
814                                                   
815         // execute the task(s)                    
816         while(!_task_queue->empty())              
817         {                                         
818             auto _task = _task_queue->GetTask(    
819             if(_task)                             
820             {                                     
821                 (*_task)();                       
822             }                                     
823         }                                         
824         //------------------------------------    
825                                                   
826         // disable guard against recursive dea    
827         data->within_task = false;                
828         //------------------------------------    
829     }                                             
830 }                                                 
831                                                   
832 //============================================    
833                                                   
834 }  // namespace PTL                               
835