Geant4 Cross Reference

Cross-Referencing   Geant4
Geant4/externals/ptl/include/PTL/ThreadPool.hh

Version: [ ReleaseNotes ] [ 1.0 ] [ 1.1 ] [ 2.0 ] [ 3.0 ] [ 3.1 ] [ 3.2 ] [ 4.0 ] [ 4.0.p1 ] [ 4.0.p2 ] [ 4.1 ] [ 4.1.p1 ] [ 5.0 ] [ 5.0.p1 ] [ 5.1 ] [ 5.1.p1 ] [ 5.2 ] [ 5.2.p1 ] [ 5.2.p2 ] [ 6.0 ] [ 6.0.p1 ] [ 6.1 ] [ 6.2 ] [ 6.2.p1 ] [ 6.2.p2 ] [ 7.0 ] [ 7.0.p1 ] [ 7.1 ] [ 7.1.p1 ] [ 8.0 ] [ 8.0.p1 ] [ 8.1 ] [ 8.1.p1 ] [ 8.1.p2 ] [ 8.2 ] [ 8.2.p1 ] [ 8.3 ] [ 8.3.p1 ] [ 8.3.p2 ] [ 9.0 ] [ 9.0.p1 ] [ 9.0.p2 ] [ 9.1 ] [ 9.1.p1 ] [ 9.1.p2 ] [ 9.1.p3 ] [ 9.2 ] [ 9.2.p1 ] [ 9.2.p2 ] [ 9.2.p3 ] [ 9.2.p4 ] [ 9.3 ] [ 9.3.p1 ] [ 9.3.p2 ] [ 9.4 ] [ 9.4.p1 ] [ 9.4.p2 ] [ 9.4.p3 ] [ 9.4.p4 ] [ 9.5 ] [ 9.5.p1 ] [ 9.5.p2 ] [ 9.6 ] [ 9.6.p1 ] [ 9.6.p2 ] [ 9.6.p3 ] [ 9.6.p4 ] [ 10.0 ] [ 10.0.p1 ] [ 10.0.p2 ] [ 10.0.p3 ] [ 10.0.p4 ] [ 10.1 ] [ 10.1.p1 ] [ 10.1.p2 ] [ 10.1.p3 ] [ 10.2 ] [ 10.2.p1 ] [ 10.2.p2 ] [ 10.2.p3 ] [ 10.3 ] [ 10.3.p1 ] [ 10.3.p2 ] [ 10.3.p3 ] [ 10.4 ] [ 10.4.p1 ] [ 10.4.p2 ] [ 10.4.p3 ] [ 10.5 ] [ 10.5.p1 ] [ 10.6 ] [ 10.6.p1 ] [ 10.6.p2 ] [ 10.6.p3 ] [ 10.7 ] [ 10.7.p1 ] [ 10.7.p2 ] [ 10.7.p3 ] [ 10.7.p4 ] [ 11.0 ] [ 11.0.p1 ] [ 11.0.p2 ] [ 11.0.p3, ] [ 11.0.p4 ] [ 11.1 ] [ 11.1.1 ] [ 11.1.2 ] [ 11.1.3 ] [ 11.2 ] [ 11.2.1 ] [ 11.2.2 ] [ 11.3.0 ]

Diff markup

Differences between /externals/ptl/include/PTL/ThreadPool.hh (Version 11.3.0) and /externals/ptl/include/PTL/ThreadPool.hh (Version 6.0)


  1 //                                                  1 
  2 // MIT License                                    
  3 // Copyright (c) 2020 Jonathan R. Madsen          
  4 // Permission is hereby granted, free of charg    
  5 // of this software and associated documentati    
  6 // in the Software without restriction, includ    
  7 // to use, copy, modify, merge, publish, distr    
  8 // copies of the Software, and to permit perso    
  9 // furnished to do so, subject to the followin    
 10 // The above copyright notice and this permiss    
 11 // all copies or substantial portions of the S    
 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR    
 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT    
 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH    
 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR    
 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS    
 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI    
 18 //                                                
 19 // -------------------------------------------    
 20 // Tasking class header file                      
 21 //                                                
 22 // Class Description:                             
 23 //                                                
 24 // This file creates a class for an efficient     
 25 // accepts work in the form of tasks.             
 26 //                                                
 27 // -------------------------------------------    
 28 // Author: Jonathan Madsen (Feb 13th 2018)        
 29 // -------------------------------------------    
 30                                                   
 31 #pragma once                                      
 32                                                   
 33 #include "PTL/AutoLock.hh"                        
 34 #ifndef G4GMAKE                                   
 35 #include ""                                       
 36 #endif                                            
 37 #include "PTL/ThreadData.hh"                      
 38 #include "PTL/Threading.hh"                       
 39 #include "PTL/Types.hh"                           
 40 #include "PTL/VTask.hh"                           
 41 #include "PTL/VUserTaskQueue.hh"                  
 42                                                   
 43 #if defined(PTL_USE_TBB)                          
 44 #    if !defined(TBB_SUPPRESS_DEPRECATED_MESSA    
 45 #        define TBB_SUPPRESS_DEPRECATED_MESSAG    
 46 #    endif                                        
 47 #    if !defined(TBB_PREVIEW_GLOBAL_CONTROL)      
 48 #        define TBB_PREVIEW_GLOBAL_CONTROL 1      
 49 #    endif                                        
 50 #    include <tbb/global_control.h>               
 51 #    include <tbb/task_arena.h>                   
 52 #    include <tbb/task_group.h>                   
 53 #endif                                            
 54                                                   
 55 #include <algorithm>                              
 56 #include <atomic>                                 
 57 #include <chrono>                                 
 58 #include <cstdint>                                
 59 #include <cstdlib>                                
 60 #include <deque>                                  
 61 #include <functional>                             
 62 #include <iostream>                               
 63 #include <map>                                    
 64 #include <memory>                                 
 65 #include <mutex>  // IWYU pragma: keep            
 66 #include <set>                                    
 67 #include <thread>                                 
 68 #include <type_traits>  // IWYU pragma: keep      
 69 #include <unordered_map>                          
 70 #include <utility>                                
 71 #include <vector>                                 
 72                                                   
 73 namespace PTL                                     
 74 {                                                 
 75 namespace thread_pool                             
 76 {                                                 
 77 namespace state                                   
 78 {                                                 
 79 static const short STARTED = 0;                   
 80 static const short PARTIAL = 1;                   
 81 static const short STOPPED = 2;                   
 82 static const short NONINIT = 3;                   
 83                                                   
 84 }  // namespace state                             
 85 }  // namespace thread_pool                       
 86                                                   
 87 class ThreadPool                                  
 88 {                                                 
 89 public:                                           
 90     template <typename KeyT, typename MappedT,    
 91     using uomap = std::unordered_map<KeyT, Map    
 92                                                   
 93     // pod-types                                  
 94     using size_type        = size_t;              
 95     using task_count_type  = std::shared_ptr<s    
 96     using atomic_int_type  = std::shared_ptr<s    
 97     using pool_state_type  = std::shared_ptr<s    
 98     using atomic_bool_type = std::shared_ptr<s    
 99     // objects                                    
100     using task_type    = VTask;                   
101     using lock_t       = std::shared_ptr<Mutex    
102     using condition_t  = std::shared_ptr<Condi    
103     using task_pointer = std::shared_ptr<task_    
104     using task_queue_t = VUserTaskQueue;          
105     // containers                                 
106     using thread_list_t      = std::deque<Thre    
107     using bool_list_t        = std::vector<boo    
108     using thread_id_map_t    = std::map<Thread    
109     using thread_index_map_t = std::map<uintma    
110     using thread_vec_t       = std::vector<Thr    
111     using thread_data_t      = std::vector<std    
112     // functions                                  
113     using initialize_func_t = std::function<vo    
114     using finalize_func_t   = std::function<vo    
115     using affinity_func_t   = std::function<in    
116                                                   
117     static affinity_func_t& affinity_functor()    
118     {                                             
119         static affinity_func_t _v = [](intmax_    
120             static std::atomic<intmax_t> assig    
121             intmax_t                     _assi    
122             return _assign % Thread::hardware_    
123         };                                        
124         return _v;                                
125     }                                             
126                                                   
127     static initialize_func_t& initialization_f    
128     {                                             
129         static initialize_func_t _v = []() {};    
130         return _v;                                
131     }                                             
132                                                   
133     static finalize_func_t& finalization_funct    
134     {                                             
135         static finalize_func_t _v = []() {};      
136         return _v;                                
137     }                                             
138                                                   
139     struct Config                                 
140     {                                             
141         bool              init         = true;    
142         bool              use_tbb      = false    
143         bool              use_affinity = false    
144         int               verbose      = 0;       
145         int               priority     = 0;       
146         size_type         pool_size    = f_def    
147         VUserTaskQueue*   task_queue   = nullp    
148         affinity_func_t   set_affinity = affin    
149         initialize_func_t initializer  = initi    
150         finalize_func_t   finalizer    = final    
151     };                                            
152                                                   
153 public:                                           
154     // Constructor and Destructors                
155     explicit ThreadPool(const Config&);           
156     ~ThreadPool();                                
157     ThreadPool(const ThreadPool&) = delete;       
158     ThreadPool(ThreadPool&&)      = default;      
159     ThreadPool& operator=(const ThreadPool&) =    
160     ThreadPool& operator=(ThreadPool&&) = defa    
161                                                   
162 public:                                           
163     // Public functions                           
164     size_type initialize_threadpool(size_type)    
165     size_type destroy_threadpool();               
166     size_type stop_thread();                      
167                                                   
168     template <typename FuncT>                     
169     void execute_on_all_threads(FuncT&& _func)    
170                                                   
171     template <typename FuncT>                     
172     void execute_on_specific_threads(const std    
173                                      FuncT&&      
174                                                   
175     task_queue_t*  get_queue() const { return     
176     task_queue_t*& get_valid_queue(task_queue_    
177                                                   
178     bool is_tbb_threadpool() const { return m_    
179                                                   
180 public:                                           
181     /// set the default pool size                 
182     static void set_default_size(size_type _v)    
183                                                   
184     /// get the default pool size                 
185     static size_type get_default_size() { retu    
186                                                   
187 public:                                           
188     // add tasks for threads to process           
189     size_type add_task(task_pointer&& task, in    
190     // size_type add_thread_task(ThreadId id,     
191     // add a generic container with iterator      
192     template <typename ListT>                     
193     size_type add_tasks(ListT&);                  
194                                                   
195     Thread* get_thread(size_type _n) const;       
196     Thread* get_thread(std::thread::id id) con    
197                                                   
198     // only relevant when compiled with PTL_US    
199     static tbb_global_control_t*& tbb_global_c    
200                                                   
201     void set_initialization(initialize_func_t     
202     void set_finalization(finalize_func_t f) {    
203                                                   
204     void reset_initialization()                   
205     {                                             
206         m_init_func = []() {};                    
207     }                                             
208     void reset_finalization()                     
209     {                                             
210         m_fini_func = []() {};                    
211     }                                             
212                                                   
213 public:                                           
214     // get the pool state                         
215     const pool_state_type& state() const { ret    
216     // see how many main task threads there ar    
217     size_type size() const { return m_pool_siz    
218     // set the thread pool size                   
219     void resize(size_type _n);                    
220     // affinity assigns threads to cores, assi    
221     bool using_affinity() const { return m_use    
222     bool is_alive() { return m_alive_flag->loa    
223     void notify();                                
224     void notify_all();                            
225     void notify(size_type);                       
226     bool is_initialized() const;                  
227     int  get_active_threads_count() const { re    
228                                                   
229     void set_affinity(affinity_func_t f) { m_a    
230     void set_affinity(intmax_t i, Thread&) con    
231     void set_priority(int _prio, Thread&) cons    
232                                                   
233     void set_verbose(int n) { m_verbose = n; }    
234     int  get_verbose() const { return m_verbos    
235     bool is_main() const { return ThisThread::    
236                                                   
237     tbb_task_arena_t* get_task_arena();           
238                                                   
239 public:                                           
240     // read FORCE_NUM_THREADS environment vari    
241     static const thread_id_map_t& get_thread_i    
242     static uintmax_t              get_thread_i    
243     static uintmax_t              get_this_thr    
244     static uintmax_t              add_thread_i    
245                                                   
246 private:                                          
247     void execute_thread(VUserTaskQueue*);  //     
248     int  insert(task_pointer&&, int = -1);        
249     int  run_on_this(task_pointer&&);             
250                                                   
251 private:                                          
252     // called in THREAD INIT                      
253     static void start_thread(ThreadPool*, thre    
254                                                   
255     void record_entry();                          
256     void record_exit();                           
257                                                   
258 private:                                          
259     // Private variables                          
260     // random                                     
261     bool             m_use_affinity      = fal    
262     bool             m_tbb_tp            = fal    
263     bool             m_delete_task_queue = fal    
264     int              m_verbose           = 0;     
265     int              m_priority          = 0;     
266     size_type        m_pool_size         = 0;     
267     ThreadId         m_main_tid          = Thi    
268     atomic_bool_type m_alive_flag        = std    
269     pool_state_type  m_pool_state        = std    
270     atomic_int_type  m_thread_awake      = std    
271     atomic_int_type  m_thread_active     = std    
272                                                   
273     // locks                                      
274     lock_t m_task_lock = std::make_shared<Mute    
275     // conditions                                 
276     condition_t m_task_cond = std::make_shared    
277                                                   
278     // containers                                 
279     bool_list_t   m_is_joined    = {};  // joi    
280     bool_list_t   m_is_stopped   = {};  // let    
281     thread_list_t m_main_threads = {};  // sto    
282     thread_list_t m_stop_threads = {};  // sto    
283     thread_vec_t  m_threads      = {};            
284     thread_data_t m_thread_data  = {};            
285                                                   
286     // task queue                                 
287     task_queue_t*     m_task_queue     = nullp    
288     tbb_task_arena_t* m_tbb_task_arena = nullp    
289     tbb_task_group_t* m_tbb_task_group = nullp    
290                                                   
291     // functions                                  
292     initialize_func_t m_init_func     = initia    
293     finalize_func_t   m_fini_func     = finali    
294     affinity_func_t   m_affinity_func = affini    
295                                                   
296 private:                                          
297     static size_type&       f_default_pool_siz    
298     static thread_id_map_t& f_thread_ids();       
299 };                                                
300                                                   
301 //--------------------------------------------    
302 inline void                                       
303 ThreadPool::notify()                              
304 {                                                 
305     // wake up one thread that is waiting for     
306     if(m_thread_awake->load() < m_pool_size)      
307     {                                             
308         AutoLock l(*m_task_lock);                 
309         m_task_cond->notify_one();                
310     }                                             
311 }                                                 
312 //--------------------------------------------    
313 inline void                                       
314 ThreadPool::notify_all()                          
315 {                                                 
316     // wake all threads                           
317     AutoLock l(*m_task_lock);                     
318     m_task_cond->notify_all();                    
319 }                                                 
320 //--------------------------------------------    
321 inline void                                       
322 ThreadPool::notify(size_type ntasks)              
323 {                                                 
324     if(ntasks == 0)                               
325         return;                                   
326                                                   
327     // wake up as many threads that tasks just    
328     if(m_thread_awake->load() < m_pool_size)      
329     {                                             
330         AutoLock l(*m_task_lock);                 
331         if(ntasks < this->size())                 
332         {                                         
333             for(size_type i = 0; i < ntasks; +    
334                 m_task_cond->notify_one();        
335         }                                         
336         else                                      
337         {                                         
338             m_task_cond->notify_all();            
339         }                                         
340     }                                             
341 }                                                 
342 //--------------------------------------------    
343 // local function for getting the tbb task sch    
344 inline tbb_global_control_t*&                     
345 ThreadPool::tbb_global_control()                  
346 {                                                 
347     static thread_local tbb_global_control_t*     
348     return _instance;                             
349 }                                                 
350 //--------------------------------------------    
351 // task arena                                     
352 inline tbb_task_arena_t*                          
353 ThreadPool::get_task_arena()                      
354 {                                                 
355 #if defined(PTL_USE_TBB)                          
356     // create a task arena                        
357     if(!m_tbb_task_arena)                         
358     {                                             
359         auto _sz = (tbb_global_control())         
360                        ? tbb_global_control()-    
361                              tbb::global_contr    
362                        : size();                  
363         m_tbb_task_arena = new tbb_task_arena_    
364         m_tbb_task_arena->initialize(_sz, 1);     
365     }                                             
366 #else                                             
367     if(!m_tbb_task_arena)                         
368         m_tbb_task_arena = new tbb_task_arena_    
369 #endif                                            
370     return m_tbb_task_arena;                      
371 }                                                 
372 //--------------------------------------------    
373 inline void                                       
374 ThreadPool::resize(size_type _n)                  
375 {                                                 
376     initialize_threadpool(_n);                    
377     if(m_task_queue)                              
378         m_task_queue->resize(static_cast<intma    
379 }                                                 
380 //--------------------------------------------    
381 inline int                                        
382 ThreadPool::run_on_this(task_pointer&& _task)     
383 {                                                 
384     auto&& _func = [_task]() { (*_task)(); };     
385                                                   
386     if(m_tbb_tp && m_tbb_task_group)              
387     {                                             
388         auto* _arena = get_task_arena();          
389         _arena->execute([this, _func]() { this    
390     }                                             
391     else                                          
392     {                                             
393         _func();                                  
394     }                                             
395     // return the number of tasks added to tas    
396     return 0;                                     
397 }                                                 
398 //--------------------------------------------    
399 inline int                                        
400 ThreadPool::insert(task_pointer&& task, int bi    
401 {                                                 
402     static thread_local ThreadData* _data = Th    
403                                                   
404     // pass the task to the queue                 
405     auto ibin = get_valid_queue(m_task_queue)-    
406     notify();                                     
407     return (int)ibin;                             
408 }                                                 
409 //--------------------------------------------    
410 inline ThreadPool::size_type                      
411 ThreadPool::add_task(task_pointer&& task, int     
412 {                                                 
413     // if not native (i.e. TBB) or we haven't     
414     if(m_tbb_tp || !task->is_native_task() ||     
415         return static_cast<size_type>(run_on_t    
416                                                   
417     return static_cast<size_type>(insert(std::    
418 }                                                 
419 //--------------------------------------------    
420 template <typename ListT>                         
421 inline ThreadPool::size_type                      
422 ThreadPool::add_tasks(ListT& c)                   
423 {                                                 
424     if(!m_alive_flag)  // if we haven't built     
425     {                                             
426         for(auto& itr : c)                        
427             run(itr);                             
428         c.clear();                                
429         return 0;                                 
430     }                                             
431                                                   
432     // TODO: put a limit on how many tasks can    
433     auto c_size = c.size();                       
434     for(auto& itr : c)                            
435     {                                             
436         if(!itr->is_native_task())                
437             --c_size;                             
438         else                                      
439         {                                         
440             //++(m_task_queue);                   
441             get_valid_queue(m_task_queue)->Ins    
442         }                                         
443     }                                             
444     c.clear();                                    
445                                                   
446     // notify sleeping threads                    
447     notify(c_size);                               
448                                                   
449     return c_size;                                
450 }                                                 
451 //--------------------------------------------    
452 template <typename FuncT>                         
453 inline void                                       
454 ThreadPool::execute_on_all_threads(FuncT&& _fu    
455 {                                                 
456     if(m_tbb_tp && m_tbb_task_group)              
457     {                                             
458 #if defined(PTL_USE_TBB)                          
459         // TBB lazily activates threads to pro    
460         // participates in processing the task    
461         // function to execute only on the wor    
462         //                                        
463         std::set<std::thread::id> _first{};       
464         Mutex                     _mutex{};       
465         // init function which executes functi    
466         auto _init = [&]() {                      
467             int _once = 0;                        
468             _mutex.lock();                        
469             if(_first.find(std::this_thread::g    
470             {                                     
471                 // we need to reset this threa    
472                 // of the same template instan    
473                 _once = 1;                        
474                 _first.insert(std::this_thread    
475             }                                     
476             _mutex.unlock();                      
477             if(_once != 0)                        
478             {                                     
479                 _func();                          
480                 return 1;                         
481             }                                     
482             return 0;                             
483         };                                        
484         // this will collect the number of thr    
485         // executed the _init function above      
486         std::atomic<size_t> _total_init{ 0 };     
487         // max parallelism by TBB                 
488         size_t _maxp = tbb_global_control()->a    
489             tbb::global_control::max_allowed_p    
490         // create a task arean                    
491         auto* _arena = get_task_arena();          
492         // size of the thread-pool                
493         size_t _sz = size();                      
494         // number of cores                        
495         size_t _ncore = GetNumberOfCores();       
496         // maximum depth for recursion            
497         size_t _dmax = std::max<size_t>(_ncore    
498         // how many threads we need to initial    
499         size_t _num = std::min(_maxp, std::min    
500         // this is the task passed to the task    
501         std::function<void()> _init_task;         
502         _init_task = [&]() {                      
503             add_thread_id();                      
504             static thread_local size_type _dep    
505             int                           _ret    
506             // don't let the main thread execu    
507             if(!is_main())                        
508             {                                     
509                 // execute the function           
510                 _ret = _init();                   
511                 // add the result                 
512                 _total_init += _ret;              
513             }                                     
514             // if the function did not return     
515             // two more tasks                     
516             ++_depth;                             
517             if(_ret == 0 && _depth < _dmax &&     
518             {                                     
519                 tbb::task_group tg{};             
520                 tg.run([&]() { _init_task(); }    
521                 tg.run([&]() { _init_task(); }    
522                 ThisThread::sleep_for(std::chr    
523                 tg.wait();                        
524             }                                     
525             --_depth;                             
526         };                                        
527                                                   
528         // TBB won't oversubscribe so we need     
529         size_t nitr        = 0;                   
530         auto   _fname      = __FUNCTION__;        
531         auto   _write_info = [&]() {              
532             std::cout << "[" << _fname << "]>     
533                       << ", expected: " << _nu    
534                       << ", size: " << _sz <<     
535         };                                        
536         while(_total_init < _num)                 
537         {                                         
538             auto _n = 2 * _num;                   
539             while(--_n > 0)                       
540             {                                     
541                 _arena->execute(                  
542                     [&]() { m_tbb_task_group->    
543             }                                     
544             _arena->execute([&]() { m_tbb_task    
545             // don't loop infinitely but use a    
546             if(nitr++ > 2 * (_num + 1) && (_to    
547             {                                     
548                 _write_info();                    
549                 break;                            
550             }                                     
551             // at this point we need to exit      
552             if(nitr > 4 * (_ncore + 1))           
553             {                                     
554                 _write_info();                    
555                 break;                            
556             }                                     
557         }                                         
558         if(get_verbose() > 3)                     
559             _write_info();                        
560 #endif                                            
561     }                                             
562     else if(get_queue())                          
563     {                                             
564         get_queue()->ExecuteOnAllThreads(this,    
565     }                                             
566 }                                                 
567                                                   
568 //--------------------------------------------    
569                                                   
570 template <typename FuncT>                         
571 inline void                                       
572 ThreadPool::execute_on_specific_threads(const     
573                                         FuncT&    
574 {                                                 
575     if(m_tbb_tp && m_tbb_task_group)              
576     {                                             
577 #if defined(PTL_USE_TBB)                          
578         // TBB lazily activates threads to pro    
579         // participates in processing the task    
580         // function to execute only on the wor    
581         //                                        
582         std::set<std::thread::id> _first{};       
583         Mutex                     _mutex{};       
584         // init function which executes functi    
585         auto _exec = [&]() {                      
586             int _once = 0;                        
587             _mutex.lock();                        
588             if(_first.find(std::this_thread::g    
589             {                                     
590                 // we need to reset this threa    
591                 // of the same template instan    
592                 _once = 1;                        
593                 _first.insert(std::this_thread    
594             }                                     
595             _mutex.unlock();                      
596             if(_once != 0)                        
597             {                                     
598                 _func();                          
599                 return 1;                         
600             }                                     
601             return 0;                             
602         };                                        
603         // this will collect the number of thr    
604         // executed the _exec function above      
605         std::atomic<size_t> _total_exec{ 0 };     
606         // number of cores                        
607         size_t _ncore = GetNumberOfCores();       
608         // maximum depth for recursion            
609         size_t _dmax = std::max<size_t>(_ncore    
610         // how many threads we need to initial    
611         size_t _num = _tids.size();               
612         // create a task arena                    
613         auto* _arena = get_task_arena();          
614         // this is the task passed to the task    
615         std::function<void()> _exec_task;         
616         _exec_task = [&]() {                      
617             add_thread_id();                      
618             static thread_local size_type _dep    
619             int                           _ret    
620             auto                          _thi    
621             // don't let the main thread execu    
622             if(_tids.count(_this_tid) > 0)        
623             {                                     
624                 // execute the function           
625                 _ret = _exec();                   
626                 // add the result                 
627                 _total_exec += _ret;              
628             }                                     
629             // if the function did not return     
630             // two more tasks                     
631             ++_depth;                             
632             if(_ret == 0 && _depth < _dmax &&     
633             {                                     
634                 tbb::task_group tg{};             
635                 tg.run([&]() { _exec_task(); }    
636                 tg.run([&]() { _exec_task(); }    
637                 ThisThread::sleep_for(std::chr    
638                 tg.wait();                        
639             }                                     
640             --_depth;                             
641         };                                        
642                                                   
643         // TBB won't oversubscribe so we need     
644         size_t nitr        = 0;                   
645         auto   _fname      = __FUNCTION__;        
646         auto   _write_info = [&]() {              
647             std::cout << "[" << _fname << "]>     
648                       << ", expected: " << _nu    
649         };                                        
650         while(_total_exec < _num)                 
651         {                                         
652             auto _n = 2 * _num;                   
653             while(--_n > 0)                       
654             {                                     
655                 _arena->execute(                  
656                     [&]() { m_tbb_task_group->    
657             }                                     
658             _arena->execute([&]() { m_tbb_task    
659             // don't loop infinitely but use a    
660             if(nitr++ > 2 * (_num + 1) && (_to    
661             {                                     
662                 _write_info();                    
663                 break;                            
664             }                                     
665             // at this point we need to exit      
666             if(nitr > 8 * (_num + 1))             
667             {                                     
668                 _write_info();                    
669                 break;                            
670             }                                     
671         }                                         
672         if(get_verbose() > 3)                     
673             _write_info();                        
674 #endif                                            
675     }                                             
676     else if(get_queue())                          
677     {                                             
678         get_queue()->ExecuteOnSpecificThreads(    
679     }                                             
680 }                                                 
681                                                   
682 //============================================    
683                                                   
684 }  // namespace PTL                               
685