Geant4 Cross Reference

Cross-Referencing   Geant4
Geant4/externals/ptl/include/PTL/ThreadPool.hh

Version: [ ReleaseNotes ] [ 1.0 ] [ 1.1 ] [ 2.0 ] [ 3.0 ] [ 3.1 ] [ 3.2 ] [ 4.0 ] [ 4.0.p1 ] [ 4.0.p2 ] [ 4.1 ] [ 4.1.p1 ] [ 5.0 ] [ 5.0.p1 ] [ 5.1 ] [ 5.1.p1 ] [ 5.2 ] [ 5.2.p1 ] [ 5.2.p2 ] [ 6.0 ] [ 6.0.p1 ] [ 6.1 ] [ 6.2 ] [ 6.2.p1 ] [ 6.2.p2 ] [ 7.0 ] [ 7.0.p1 ] [ 7.1 ] [ 7.1.p1 ] [ 8.0 ] [ 8.0.p1 ] [ 8.1 ] [ 8.1.p1 ] [ 8.1.p2 ] [ 8.2 ] [ 8.2.p1 ] [ 8.3 ] [ 8.3.p1 ] [ 8.3.p2 ] [ 9.0 ] [ 9.0.p1 ] [ 9.0.p2 ] [ 9.1 ] [ 9.1.p1 ] [ 9.1.p2 ] [ 9.1.p3 ] [ 9.2 ] [ 9.2.p1 ] [ 9.2.p2 ] [ 9.2.p3 ] [ 9.2.p4 ] [ 9.3 ] [ 9.3.p1 ] [ 9.3.p2 ] [ 9.4 ] [ 9.4.p1 ] [ 9.4.p2 ] [ 9.4.p3 ] [ 9.4.p4 ] [ 9.5 ] [ 9.5.p1 ] [ 9.5.p2 ] [ 9.6 ] [ 9.6.p1 ] [ 9.6.p2 ] [ 9.6.p3 ] [ 9.6.p4 ] [ 10.0 ] [ 10.0.p1 ] [ 10.0.p2 ] [ 10.0.p3 ] [ 10.0.p4 ] [ 10.1 ] [ 10.1.p1 ] [ 10.1.p2 ] [ 10.1.p3 ] [ 10.2 ] [ 10.2.p1 ] [ 10.2.p2 ] [ 10.2.p3 ] [ 10.3 ] [ 10.3.p1 ] [ 10.3.p2 ] [ 10.3.p3 ] [ 10.4 ] [ 10.4.p1 ] [ 10.4.p2 ] [ 10.4.p3 ] [ 10.5 ] [ 10.5.p1 ] [ 10.6 ] [ 10.6.p1 ] [ 10.6.p2 ] [ 10.6.p3 ] [ 10.7 ] [ 10.7.p1 ] [ 10.7.p2 ] [ 10.7.p3 ] [ 10.7.p4 ] [ 11.0 ] [ 11.0.p1 ] [ 11.0.p2 ] [ 11.0.p3, ] [ 11.0.p4 ] [ 11.1 ] [ 11.1.1 ] [ 11.1.2 ] [ 11.1.3 ] [ 11.2 ] [ 11.2.1 ] [ 11.2.2 ] [ 11.3.0 ]

Diff markup

Differences between /externals/ptl/include/PTL/ThreadPool.hh (Version 11.3.0) and /externals/ptl/include/PTL/ThreadPool.hh (Version 10.7)


  1 //                                                  1 //
  2 // MIT License                                      2 // MIT License
  3 // Copyright (c) 2020 Jonathan R. Madsen            3 // Copyright (c) 2020 Jonathan R. Madsen
  4 // Permission is hereby granted, free of charg      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
  5 // of this software and associated documentati      5 // of this software and associated documentation files (the "Software"), to deal
  6 // in the Software without restriction, includ      6 // in the Software without restriction, including without limitation the rights
  7 // to use, copy, modify, merge, publish, distr      7 // to use, copy, modify, merge, publish, distribute, sublicense, and
  8 // copies of the Software, and to permit perso      8 // copies of the Software, and to permit persons to whom the Software is
  9 // furnished to do so, subject to the followin      9 // furnished to do so, subject to the following conditions:
 10 // The above copyright notice and this permiss     10 // The above copyright notice and this permission notice shall be included in
 11 // all copies or substantial portions of the S     11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR     12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT     13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH     14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR     15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS     16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI     17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 18 //                                                 18 //
 19 // -------------------------------------------     19 // ---------------------------------------------------------------
 20 // Tasking class header file                       20 // Tasking class header file
 21 //                                                 21 //
 22 // Class Description:                              22 // Class Description:
 23 //                                                 23 //
 24 // This file creates a class for an efficient      24 // This file creates a class for an efficient thread-pool that
 25 // accepts work in the form of tasks.              25 // accepts work in the form of tasks.
 26 //                                                 26 //
 27 // -------------------------------------------     27 // ---------------------------------------------------------------
 28 // Author: Jonathan Madsen (Feb 13th 2018)         28 // Author: Jonathan Madsen (Feb 13th 2018)
 29 // -------------------------------------------     29 // ---------------------------------------------------------------
 30                                                    30 
 31 #pragma once                                       31 #pragma once
 32                                                    32 
 33 #include "PTL/AutoLock.hh"                         33 #include "PTL/AutoLock.hh"
 34 #ifndef G4GMAKE                                << 
 35 #include ""                                    << 
 36 #endif                                         << 
 37 #include "PTL/ThreadData.hh"                       34 #include "PTL/ThreadData.hh"
 38 #include "PTL/Threading.hh"                        35 #include "PTL/Threading.hh"
 39 #include "PTL/Types.hh"                        << 
 40 #include "PTL/VTask.hh"                            36 #include "PTL/VTask.hh"
                                                   >>  37 #include "PTL/VTaskGroup.hh"
 41 #include "PTL/VUserTaskQueue.hh"                   38 #include "PTL/VUserTaskQueue.hh"
 42                                                    39 
 43 #if defined(PTL_USE_TBB)                       <<  40 #ifdef PTL_USE_TBB
 44 #    if !defined(TBB_SUPPRESS_DEPRECATED_MESSA << 
 45 #        define TBB_SUPPRESS_DEPRECATED_MESSAG << 
 46 #    endif                                     << 
 47 #    if !defined(TBB_PREVIEW_GLOBAL_CONTROL)   << 
 48 #        define TBB_PREVIEW_GLOBAL_CONTROL 1   << 
 49 #    endif                                     << 
 50 #    include <tbb/global_control.h>                41 #    include <tbb/global_control.h>
 51 #    include <tbb/task_arena.h>                <<  42 #    include <tbb/tbb.h>
 52 #    include <tbb/task_group.h>                << 
 53 #endif                                             43 #endif
 54                                                    44 
 55 #include <algorithm>                           <<  45 // C
 56 #include <atomic>                              << 
 57 #include <chrono>                              << 
 58 #include <cstdint>                                 46 #include <cstdint>
 59 #include <cstdlib>                                 47 #include <cstdlib>
                                                   >>  48 #include <cstring>
                                                   >>  49 // C++
                                                   >>  50 #include <atomic>
 60 #include <deque>                                   51 #include <deque>
 61 #include <functional>                          << 
 62 #include <iostream>                                52 #include <iostream>
 63 #include <map>                                     53 #include <map>
 64 #include <memory>                                  54 #include <memory>
 65 #include <mutex>  // IWYU pragma: keep         <<  55 #include <queue>
 66 #include <set>                                     56 #include <set>
 67 #include <thread>                              <<  57 #include <stack>
 68 #include <type_traits>  // IWYU pragma: keep   << 
 69 #include <unordered_map>                           58 #include <unordered_map>
 70 #include <utility>                             << 
 71 #include <vector>                                  59 #include <vector>
 72                                                    60 
 73 namespace PTL                                      61 namespace PTL
 74 {                                                  62 {
 75 namespace thread_pool                          << 
 76 {                                              << 
 77 namespace state                                << 
 78 {                                              << 
 79 static const short STARTED = 0;                << 
 80 static const short PARTIAL = 1;                << 
 81 static const short STOPPED = 2;                << 
 82 static const short NONINIT = 3;                << 
 83                                                << 
 84 }  // namespace state                          << 
 85 }  // namespace thread_pool                    << 
 86                                                << 
 87 class ThreadPool                                   63 class ThreadPool
 88 {                                                  64 {
 89 public:                                            65 public:
 90     template <typename KeyT, typename MappedT,     66     template <typename KeyT, typename MappedT, typename HashT = KeyT>
 91     using uomap = std::unordered_map<KeyT, Map     67     using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
 92                                                    68 
 93     // pod-types                                   69     // pod-types
 94     using size_type        = size_t;               70     using size_type        = size_t;
 95     using task_count_type  = std::shared_ptr<s     71     using task_count_type  = std::shared_ptr<std::atomic_uintmax_t>;
 96     using atomic_int_type  = std::shared_ptr<s     72     using atomic_int_type  = std::shared_ptr<std::atomic_uintmax_t>;
 97     using pool_state_type  = std::shared_ptr<s     73     using pool_state_type  = std::shared_ptr<std::atomic_short>;
 98     using atomic_bool_type = std::shared_ptr<s     74     using atomic_bool_type = std::shared_ptr<std::atomic_bool>;
 99     // objects                                     75     // objects
100     using task_type    = VTask;                    76     using task_type    = VTask;
101     using lock_t       = std::shared_ptr<Mutex     77     using lock_t       = std::shared_ptr<Mutex>;
102     using condition_t  = std::shared_ptr<Condi     78     using condition_t  = std::shared_ptr<Condition>;
103     using task_pointer = std::shared_ptr<task_ <<  79     using task_pointer = task_type*;
104     using task_queue_t = VUserTaskQueue;           80     using task_queue_t = VUserTaskQueue;
105     // containers                                  81     // containers
106     using thread_list_t      = std::deque<Thre <<  82     typedef std::deque<ThreadId>          thread_list_t;
107     using bool_list_t        = std::vector<boo <<  83     typedef std::vector<bool>             bool_list_t;
108     using thread_id_map_t    = std::map<Thread <<  84     typedef std::map<ThreadId, uintmax_t> thread_id_map_t;
109     using thread_index_map_t = std::map<uintma <<  85     typedef std::map<uintmax_t, ThreadId> thread_index_map_t;
110     using thread_vec_t       = std::vector<Thr <<  86     using thread_vec_t = std::vector<Thread>;
111     using thread_data_t      = std::vector<std << 
112     // functions                                   87     // functions
113     using initialize_func_t = std::function<vo <<  88     typedef std::function<void()>             initialize_func_t;
114     using finalize_func_t   = std::function<vo <<  89     typedef std::function<intmax_t(intmax_t)> affinity_func_t;
115     using affinity_func_t   = std::function<in << 
116                                                    90 
117     static affinity_func_t& affinity_functor() <<  91 public:
118     {                                          <<  92     // Constructor and Destructors
119         static affinity_func_t _v = [](intmax_ <<  93     ThreadPool(
                                                   >>  94         const size_type& pool_size, VUserTaskQueue* task_queue = nullptr,
                                                   >>  95         bool _use_affinity     = GetEnv<bool>("PTL_CPU_AFFINITY", false),
                                                   >>  96         const affinity_func_t& = [](intmax_t) {
120             static std::atomic<intmax_t> assig     97             static std::atomic<intmax_t> assigned;
121             intmax_t                     _assi     98             intmax_t                     _assign = assigned++;
122             return _assign % Thread::hardware_     99             return _assign % Thread::hardware_concurrency();
123         };                                     << 100         });
124         return _v;                             << 101     // Virtual destructors are required by abstract classes
125     }                                          << 102     // so add it by default, just in case
126                                                << 103     virtual ~ThreadPool();
127     static initialize_func_t& initialization_f << 
128     {                                          << 
129         static initialize_func_t _v = []() {}; << 
130         return _v;                             << 
131     }                                          << 
132                                                << 
133     static finalize_func_t& finalization_funct << 
134     {                                          << 
135         static finalize_func_t _v = []() {};   << 
136         return _v;                             << 
137     }                                          << 
138                                                << 
139     struct Config                              << 
140     {                                          << 
141         bool              init         = true; << 
142         bool              use_tbb      = false << 
143         bool              use_affinity = false << 
144         int               verbose      = 0;    << 
145         int               priority     = 0;    << 
146         size_type         pool_size    = f_def << 
147         VUserTaskQueue*   task_queue   = nullp << 
148         affinity_func_t   set_affinity = affin << 
149         initialize_func_t initializer  = initi << 
150         finalize_func_t   finalizer    = final << 
151     };                                         << 
152                                                << 
153 public:                                        << 
154     // Constructor and Destructors             << 
155     explicit ThreadPool(const Config&);        << 
156     ~ThreadPool();                             << 
157     ThreadPool(const ThreadPool&) = delete;       104     ThreadPool(const ThreadPool&) = delete;
158     ThreadPool(ThreadPool&&)      = default;      105     ThreadPool(ThreadPool&&)      = default;
159     ThreadPool& operator=(const ThreadPool&) =    106     ThreadPool& operator=(const ThreadPool&) = delete;
160     ThreadPool& operator=(ThreadPool&&) = defa    107     ThreadPool& operator=(ThreadPool&&) = default;
161                                                   108 
162 public:                                           109 public:
163     // Public functions                           110     // Public functions
164     size_type initialize_threadpool(size_type)    111     size_type initialize_threadpool(size_type);  // start the threads
165     size_type destroy_threadpool();               112     size_type destroy_threadpool();              // destroy the threads
166     size_type stop_thread();                      113     size_type stop_thread();
167                                                   114 
168     template <typename FuncT>                     115     template <typename FuncT>
169     void execute_on_all_threads(FuncT&& _func)    116     void execute_on_all_threads(FuncT&& _func);
170                                                   117 
171     template <typename FuncT>                  << 
172     void execute_on_specific_threads(const std << 
173                                      FuncT&&   << 
174                                                << 
175     task_queue_t*  get_queue() const { return  << 
176     task_queue_t*& get_valid_queue(task_queue_ << 
177                                                << 
178     bool is_tbb_threadpool() const { return m_ << 
179                                                << 
180 public:                                           118 public:
181     /// set the default pool size              << 119     // Public functions related to TBB
182     static void set_default_size(size_type _v) << 120     static bool using_tbb();
183                                                << 121     // enable using TBB if available
184     /// get the default pool size              << 122     static void set_use_tbb(bool val);
185     static size_type get_default_size() { retu << 
186                                                   123 
187 public:                                           124 public:
188     // add tasks for threads to process           125     // add tasks for threads to process
189     size_type add_task(task_pointer&& task, in << 126     size_type add_task(task_pointer task, int bin = -1);
190     // size_type add_thread_task(ThreadId id,     127     // size_type add_thread_task(ThreadId id, task_pointer&& task);
191     // add a generic container with iterator      128     // add a generic container with iterator
192     template <typename ListT>                     129     template <typename ListT>
193     size_type add_tasks(ListT&);                  130     size_type add_tasks(ListT&);
194                                                   131 
195     Thread* get_thread(size_type _n) const;       132     Thread* get_thread(size_type _n) const;
196     Thread* get_thread(std::thread::id id) con    133     Thread* get_thread(std::thread::id id) const;
197                                                   134 
                                                   >> 135     task_queue_t* get_queue() const { return m_task_queue; }
                                                   >> 136 
198     // only relevant when compiled with PTL_US    137     // only relevant when compiled with PTL_USE_TBB
199     static tbb_global_control_t*& tbb_global_c    138     static tbb_global_control_t*& tbb_global_control();
200                                                   139 
201     void set_initialization(initialize_func_t  << 140     void set_initialization(initialize_func_t f) { m_init_func = f; }
202     void set_finalization(finalize_func_t f) { << 
203                                                << 
204     void reset_initialization()                   141     void reset_initialization()
205     {                                             142     {
206         m_init_func = []() {};                 << 143         auto f      = []() {};
207     }                                          << 144         m_init_func = f;
208     void reset_finalization()                  << 
209     {                                          << 
210         m_fini_func = []() {};                 << 
211     }                                             145     }
212                                                   146 
213 public:                                           147 public:
214     // get the pool state                         148     // get the pool state
215     const pool_state_type& state() const { ret    149     const pool_state_type& state() const { return m_pool_state; }
216     // see how many main task threads there ar    150     // see how many main task threads there are
217     size_type size() const { return m_pool_siz    151     size_type size() const { return m_pool_size; }
218     // set the thread pool size                   152     // set the thread pool size
219     void resize(size_type _n);                    153     void resize(size_type _n);
220     // affinity assigns threads to cores, assi    154     // affinity assigns threads to cores, assignment at constructor
221     bool using_affinity() const { return m_use    155     bool using_affinity() const { return m_use_affinity; }
222     bool is_alive() { return m_alive_flag->loa    156     bool is_alive() { return m_alive_flag->load(); }
223     void notify();                                157     void notify();
224     void notify_all();                            158     void notify_all();
225     void notify(size_type);                       159     void notify(size_type);
226     bool is_initialized() const;                  160     bool is_initialized() const;
227     int  get_active_threads_count() const { re << 161     int  get_active_threads_count() const
                                                   >> 162     {
                                                   >> 163         return (m_thread_awake) ? m_thread_awake->load() : 0;
                                                   >> 164     }
228                                                   165 
229     void set_affinity(affinity_func_t f) { m_a << 166     void set_affinity(affinity_func_t f) { m_affinity_func = f; }
230     void set_affinity(intmax_t i, Thread&) con << 167     void set_affinity(intmax_t i, Thread&);
231     void set_priority(int _prio, Thread&) cons << 
232                                                   168 
233     void set_verbose(int n) { m_verbose = n; }    169     void set_verbose(int n) { m_verbose = n; }
234     int  get_verbose() const { return m_verbos    170     int  get_verbose() const { return m_verbose; }
235     bool is_main() const { return ThisThread:: << 171     bool is_master() const { return ThisThread::get_id() == m_master_tid; }
236                                                << 
237     tbb_task_arena_t* get_task_arena();        << 
238                                                   172 
239 public:                                           173 public:
240     // read FORCE_NUM_THREADS environment vari    174     // read FORCE_NUM_THREADS environment variable
241     static const thread_id_map_t& get_thread_i    175     static const thread_id_map_t& get_thread_ids();
242     static uintmax_t              get_thread_i << 
243     static uintmax_t              get_this_thr    176     static uintmax_t              get_this_thread_id();
244     static uintmax_t              add_thread_i << 
245                                                   177 
246 private:                                       << 178 protected:
247     void execute_thread(VUserTaskQueue*);  //     179     void execute_thread(VUserTaskQueue*);  // function thread sits in
248     int  insert(task_pointer&&, int = -1);     << 180     int  insert(const task_pointer&, int = -1);
249     int  run_on_this(task_pointer&&);          << 181     int  run_on_this(task_pointer);
250                                                   182 
251 private:                                       << 183 protected:
252     // called in THREAD INIT                      184     // called in THREAD INIT
253     static void start_thread(ThreadPool*, thre << 185     static void start_thread(ThreadPool*, intmax_t = -1);
                                                   >> 186 
                                                   >> 187     void record_entry()
                                                   >> 188     {
                                                   >> 189         if(m_thread_active)
                                                   >> 190             ++(*m_thread_active);
                                                   >> 191     }
254                                                   192 
255     void record_entry();                       << 193     void record_exit()
256     void record_exit();                        << 194     {
                                                   >> 195         if(m_thread_active)
                                                   >> 196             --(*m_thread_active);
                                                   >> 197     }
257                                                   198 
258 private:                                          199 private:
259     // Private variables                          200     // Private variables
260     // random                                     201     // random
261     bool             m_use_affinity      = fal << 202     bool             m_use_affinity;
262     bool             m_tbb_tp            = fal << 203     bool             m_tbb_tp;
263     bool             m_delete_task_queue = fal << 204     int              m_verbose   = 0;
264     int              m_verbose           = 0;  << 205     size_type        m_pool_size = 0;
265     int              m_priority          = 0;  << 206     ThreadId         m_master_tid;
266     size_type        m_pool_size         = 0;  << 207     atomic_bool_type m_alive_flag   = std::make_shared<std::atomic_bool>(false);
267     ThreadId         m_main_tid          = Thi << 208     pool_state_type  m_pool_state   = std::make_shared<std::atomic_short>(0);
268     atomic_bool_type m_alive_flag        = std << 209     atomic_int_type  m_thread_awake = std::make_shared<std::atomic_uintmax_t>();
269     pool_state_type  m_pool_state        = std << 210     atomic_int_type  m_thread_active = std::make_shared<std::atomic_uintmax_t>();
270     atomic_int_type  m_thread_awake      = std << 
271     atomic_int_type  m_thread_active     = std << 
272                                                   211 
273     // locks                                      212     // locks
274     lock_t m_task_lock = std::make_shared<Mute    213     lock_t m_task_lock = std::make_shared<Mutex>();
275     // conditions                                 214     // conditions
276     condition_t m_task_cond = std::make_shared    215     condition_t m_task_cond = std::make_shared<Condition>();
277                                                   216 
278     // containers                                 217     // containers
279     bool_list_t   m_is_joined    = {};  // joi << 218     bool_list_t   m_is_joined;     // join list
280     bool_list_t   m_is_stopped   = {};  // let << 219     bool_list_t   m_is_stopped;    // lets thread know to stop
281     thread_list_t m_main_threads = {};  // sto << 220     thread_list_t m_main_threads;  // storage for active threads
282     thread_list_t m_stop_threads = {};  // sto << 221     thread_list_t m_stop_threads;  // storage for stopped threads
283     thread_vec_t  m_threads      = {};         << 222     thread_vec_t  m_threads;
284     thread_data_t m_thread_data  = {};         << 
285                                                   223 
286     // task queue                                 224     // task queue
287     task_queue_t*     m_task_queue     = nullp << 225     task_queue_t*     m_task_queue;
288     tbb_task_arena_t* m_tbb_task_arena = nullp << 226     tbb_task_group_t* m_tbb_task_group;
289     tbb_task_group_t* m_tbb_task_group = nullp << 
290                                                   227 
291     // functions                                  228     // functions
292     initialize_func_t m_init_func     = initia << 229     initialize_func_t m_init_func;
293     finalize_func_t   m_fini_func     = finali << 230     affinity_func_t   m_affinity_func;
294     affinity_func_t   m_affinity_func = affini << 
295                                                   231 
296 private:                                          232 private:
297     static size_type&       f_default_pool_siz << 233     // Private static variables
298     static thread_id_map_t& f_thread_ids();    << 234     PTL_DLL static thread_id_map_t f_thread_ids;
                                                   >> 235     PTL_DLL static bool            f_use_tbb;
299 };                                                236 };
300                                                   237 
301 //--------------------------------------------    238 //--------------------------------------------------------------------------------------//
302 inline void                                       239 inline void
303 ThreadPool::notify()                              240 ThreadPool::notify()
304 {                                                 241 {
305     // wake up one thread that is waiting for     242     // wake up one thread that is waiting for a task to be available
306     if(m_thread_awake->load() < m_pool_size)   << 243     if(m_thread_awake && m_thread_awake->load() < m_pool_size)
307     {                                             244     {
308         AutoLock l(*m_task_lock);                 245         AutoLock l(*m_task_lock);
309         m_task_cond->notify_one();                246         m_task_cond->notify_one();
310     }                                             247     }
311 }                                                 248 }
312 //--------------------------------------------    249 //--------------------------------------------------------------------------------------//
313 inline void                                       250 inline void
314 ThreadPool::notify_all()                          251 ThreadPool::notify_all()
315 {                                                 252 {
316     // wake all threads                           253     // wake all threads
317     AutoLock l(*m_task_lock);                     254     AutoLock l(*m_task_lock);
318     m_task_cond->notify_all();                    255     m_task_cond->notify_all();
319 }                                                 256 }
320 //--------------------------------------------    257 //--------------------------------------------------------------------------------------//
321 inline void                                       258 inline void
322 ThreadPool::notify(size_type ntasks)              259 ThreadPool::notify(size_type ntasks)
323 {                                                 260 {
324     if(ntasks == 0)                               261     if(ntasks == 0)
325         return;                                   262         return;
326                                                   263 
327     // wake up as many threads that tasks just    264     // wake up as many threads that tasks just added
328     if(m_thread_awake->load() < m_pool_size)   << 265     if(m_thread_awake && m_thread_awake->load() < m_pool_size)
329     {                                             266     {
330         AutoLock l(*m_task_lock);                 267         AutoLock l(*m_task_lock);
331         if(ntasks < this->size())                 268         if(ntasks < this->size())
332         {                                         269         {
333             for(size_type i = 0; i < ntasks; +    270             for(size_type i = 0; i < ntasks; ++i)
334                 m_task_cond->notify_one();        271                 m_task_cond->notify_one();
335         }                                         272         }
336         else                                      273         else
337         {                                      << 
338             m_task_cond->notify_all();            274             m_task_cond->notify_all();
339         }                                      << 
340     }                                             275     }
341 }                                                 276 }
342 //--------------------------------------------    277 //--------------------------------------------------------------------------------------//
343 // local function for getting the tbb task sch    278 // local function for getting the tbb task scheduler
344 inline tbb_global_control_t*&                     279 inline tbb_global_control_t*&
345 ThreadPool::tbb_global_control()                  280 ThreadPool::tbb_global_control()
346 {                                                 281 {
347     static thread_local tbb_global_control_t*     282     static thread_local tbb_global_control_t* _instance = nullptr;
348     return _instance;                             283     return _instance;
349 }                                                 284 }
350 //--------------------------------------------    285 //--------------------------------------------------------------------------------------//
351 // task arena                                  << 
352 inline tbb_task_arena_t*                       << 
353 ThreadPool::get_task_arena()                   << 
354 {                                              << 
355 #if defined(PTL_USE_TBB)                       << 
356     // create a task arena                     << 
357     if(!m_tbb_task_arena)                      << 
358     {                                          << 
359         auto _sz = (tbb_global_control())      << 
360                        ? tbb_global_control()- << 
361                              tbb::global_contr << 
362                        : size();               << 
363         m_tbb_task_arena = new tbb_task_arena_ << 
364         m_tbb_task_arena->initialize(_sz, 1);  << 
365     }                                          << 
366 #else                                          << 
367     if(!m_tbb_task_arena)                      << 
368         m_tbb_task_arena = new tbb_task_arena_ << 
369 #endif                                         << 
370     return m_tbb_task_arena;                   << 
371 }                                              << 
372 //-------------------------------------------- << 
373 inline void                                       286 inline void
374 ThreadPool::resize(size_type _n)                  287 ThreadPool::resize(size_type _n)
375 {                                                 288 {
                                                   >> 289     if(_n == m_pool_size)
                                                   >> 290         return;
376     initialize_threadpool(_n);                    291     initialize_threadpool(_n);
377     if(m_task_queue)                           << 292     m_task_queue->resize(static_cast<intmax_t>(_n));
378         m_task_queue->resize(static_cast<intma << 
379 }                                                 293 }
380 //--------------------------------------------    294 //--------------------------------------------------------------------------------------//
381 inline int                                        295 inline int
382 ThreadPool::run_on_this(task_pointer&& _task)  << 296 ThreadPool::run_on_this(task_pointer task)
383 {                                                 297 {
384     auto&& _func = [_task]() { (*_task)(); };  << 298     auto _func = [=]() {
                                                   >> 299         (*task)();
                                                   >> 300         if(!task->group())
                                                   >> 301             delete task;
                                                   >> 302     };
385                                                   303 
386     if(m_tbb_tp && m_tbb_task_group)              304     if(m_tbb_tp && m_tbb_task_group)
387     {                                             305     {
388         auto* _arena = get_task_arena();       << 306         m_tbb_task_group->run(_func);
389         _arena->execute([this, _func]() { this << 
390     }                                             307     }
391     else                                          308     else
392     {                                             309     {
393         _func();                                  310         _func();
394     }                                             311     }
395     // return the number of tasks added to tas    312     // return the number of tasks added to task-list
396     return 0;                                     313     return 0;
397 }                                                 314 }
398 //--------------------------------------------    315 //--------------------------------------------------------------------------------------//
399 inline int                                        316 inline int
400 ThreadPool::insert(task_pointer&& task, int bi << 317 ThreadPool::insert(const task_pointer& task, int bin)
401 {                                                 318 {
402     static thread_local ThreadData* _data = Th    319     static thread_local ThreadData* _data = ThreadData::GetInstance();
403                                                   320 
404     // pass the task to the queue                 321     // pass the task to the queue
405     auto ibin = get_valid_queue(m_task_queue)- << 322     auto ibin = m_task_queue->InsertTask(task, _data, bin);
406     notify();                                     323     notify();
407     return (int)ibin;                          << 324     return ibin;
408 }                                                 325 }
409 //--------------------------------------------    326 //--------------------------------------------------------------------------------------//
410 inline ThreadPool::size_type                      327 inline ThreadPool::size_type
411 ThreadPool::add_task(task_pointer&& task, int  << 328 ThreadPool::add_task(task_pointer task, int bin)
412 {                                                 329 {
413     // if not native (i.e. TBB) or we haven't  << 330     // if not native (i.e. TBB) then return
414     if(m_tbb_tp || !task->is_native_task() ||  << 331     if(!task->is_native_task())
415         return static_cast<size_type>(run_on_t << 332         return 0;
416                                                   333 
417     return static_cast<size_type>(insert(std:: << 334     // if we haven't built thread-pool, just execute
                                                   >> 335     if(!m_alive_flag->load())
                                                   >> 336         return static_cast<size_type>(run_on_this(task));
                                                   >> 337 
                                                   >> 338     return static_cast<size_type>(insert(task, bin));
418 }                                                 339 }
419 //--------------------------------------------    340 //--------------------------------------------------------------------------------------//
420 template <typename ListT>                         341 template <typename ListT>
421 inline ThreadPool::size_type                      342 inline ThreadPool::size_type
422 ThreadPool::add_tasks(ListT& c)                   343 ThreadPool::add_tasks(ListT& c)
423 {                                                 344 {
424     if(!m_alive_flag)  // if we haven't built     345     if(!m_alive_flag)  // if we haven't built thread-pool, just execute
425     {                                             346     {
426         for(auto& itr : c)                        347         for(auto& itr : c)
427             run(itr);                             348             run(itr);
428         c.clear();                                349         c.clear();
429         return 0;                                 350         return 0;
430     }                                             351     }
431                                                   352 
432     // TODO: put a limit on how many tasks can    353     // TODO: put a limit on how many tasks can be added at most
433     auto c_size = c.size();                       354     auto c_size = c.size();
434     for(auto& itr : c)                            355     for(auto& itr : c)
435     {                                             356     {
436         if(!itr->is_native_task())                357         if(!itr->is_native_task())
437             --c_size;                             358             --c_size;
438         else                                      359         else
439         {                                         360         {
440             //++(m_task_queue);                   361             //++(m_task_queue);
441             get_valid_queue(m_task_queue)->Ins << 362             m_task_queue->InsertTask(itr);
442         }                                         363         }
443     }                                             364     }
444     c.clear();                                    365     c.clear();
445                                                   366 
446     // notify sleeping threads                    367     // notify sleeping threads
447     notify(c_size);                               368     notify(c_size);
448                                                   369 
449     return c_size;                                370     return c_size;
450 }                                                 371 }
451 //--------------------------------------------    372 //--------------------------------------------------------------------------------------//
452 template <typename FuncT>                         373 template <typename FuncT>
453 inline void                                       374 inline void
454 ThreadPool::execute_on_all_threads(FuncT&& _fu    375 ThreadPool::execute_on_all_threads(FuncT&& _func)
455 {                                                 376 {
456     if(m_tbb_tp && m_tbb_task_group)              377     if(m_tbb_tp && m_tbb_task_group)
457     {                                             378     {
458 #if defined(PTL_USE_TBB)                          379 #if defined(PTL_USE_TBB)
459         // TBB lazily activates threads to pro << 380         // TBB lazily activates threads to process tasks and the master thread
460         // participates in processing the task    381         // participates in processing the tasks so getting a specific
461         // function to execute only on the wor    382         // function to execute only on the worker threads requires some trickery
462         //                                        383         //
463         std::set<std::thread::id> _first{};    << 384         auto                      master_tid = ThisThread::get_id();
464         Mutex                     _mutex{};    << 385         std::set<std::thread::id> _first;
                                                   >> 386         Mutex                     _mutex;
465         // init function which executes functi    387         // init function which executes function and returns 1 only once
466         auto _init = [&]() {                      388         auto _init = [&]() {
467             int _once = 0;                     << 389             static thread_local int _once = 0;
468             _mutex.lock();                        390             _mutex.lock();
469             if(_first.find(std::this_thread::g    391             if(_first.find(std::this_thread::get_id()) == _first.end())
470             {                                     392             {
471                 // we need to reset this threa    393                 // we need to reset this thread-local static for multiple invocations
472                 // of the same template instan    394                 // of the same template instantiation
473                 _once = 1;                     << 395                 _once = 0;
474                 _first.insert(std::this_thread    396                 _first.insert(std::this_thread::get_id());
475             }                                     397             }
476             _mutex.unlock();                      398             _mutex.unlock();
477             if(_once != 0)                     << 399             if(_once++ == 0)
478             {                                     400             {
479                 _func();                          401                 _func();
480                 return 1;                         402                 return 1;
481             }                                     403             }
482             return 0;                             404             return 0;
483         };                                        405         };
                                                   >> 406         // consumes approximately N milliseconds of cpu time
                                                   >> 407         auto _consume = [](long n) {
                                                   >> 408             using stl_mutex_t   = std::mutex;
                                                   >> 409             using unique_lock_t = std::unique_lock<stl_mutex_t>;
                                                   >> 410             // a mutex held by one lock
                                                   >> 411             stl_mutex_t mutex;
                                                   >> 412             // acquire lock
                                                   >> 413             unique_lock_t hold_lk(mutex);
                                                   >> 414             // associate but defer
                                                   >> 415             unique_lock_t try_lk(mutex, std::defer_lock);
                                                   >> 416             // get current time
                                                   >> 417             auto now = std::chrono::steady_clock::now();
                                                   >> 418             // try until time point
                                                   >> 419             while(std::chrono::steady_clock::now() < (now + std::chrono::milliseconds(n)))
                                                   >> 420                 try_lk.try_lock();
                                                   >> 421         };
484         // this will collect the number of thr    422         // this will collect the number of threads which have
485         // executed the _init function above      423         // executed the _init function above
486         std::atomic<size_t> _total_init{ 0 };     424         std::atomic<size_t> _total_init{ 0 };
487         // max parallelism by TBB              << 
488         size_t _maxp = tbb_global_control()->a << 
489             tbb::global_control::max_allowed_p << 
490         // create a task arean                 << 
491         auto* _arena = get_task_arena();       << 
492         // size of the thread-pool             << 
493         size_t _sz = size();                   << 
494         // number of cores                     << 
495         size_t _ncore = GetNumberOfCores();    << 
496         // maximum depth for recursion         << 
497         size_t _dmax = std::max<size_t>(_ncore << 
498         // how many threads we need to initial << 
499         size_t _num = std::min(_maxp, std::min << 
500         // this is the task passed to the task    425         // this is the task passed to the task-group
501         std::function<void()> _init_task;      << 426         auto _init_task = [&]() {
502         _init_task = [&]() {                   << 427             int _ret = 0;
503             add_thread_id();                   << 428             // don't let the master thread execute the function
504             static thread_local size_type _dep << 429             if(ThisThread::get_id() != master_tid)
505             int                           _ret << 
506             // don't let the main thread execu << 
507             if(!is_main())                     << 
508             {                                     430             {
509                 // execute the function           431                 // execute the function
510                 _ret = _init();                   432                 _ret = _init();
511                 // add the result                 433                 // add the result
512                 _total_init += _ret;              434                 _total_init += _ret;
513             }                                     435             }
514             // if the function did not return  << 436             // if the function did not return anything, put it to sleep
515             // two more tasks                  << 437             // so TBB will wake other threads to execute the remaining tasks
516             ++_depth;                          << 438             if(_ret == 0)
517             if(_ret == 0 && _depth < _dmax &&  << 439                 _consume(100);
518             {                                  << 
519                 tbb::task_group tg{};          << 
520                 tg.run([&]() { _init_task(); } << 
521                 tg.run([&]() { _init_task(); } << 
522                 ThisThread::sleep_for(std::chr << 
523                 tg.wait();                     << 
524             }                                  << 
525             --_depth;                          << 
526         };                                        440         };
527                                                   441 
528         // TBB won't oversubscribe so we need     442         // TBB won't oversubscribe so we need to limit by ncores - 1
529         size_t nitr        = 0;                << 443         size_t nitr  = 0;
                                                   >> 444         size_t _maxp = tbb_global_control()->active_value(
                                                   >> 445             tbb::global_control::max_allowed_parallelism);
                                                   >> 446         size_t _sz         = size();
                                                   >> 447         size_t _ncore      = Threading::GetNumberOfCores() - 1;
                                                   >> 448         size_t _num        = std::min(_maxp, std::min(_sz, _ncore));
530         auto   _fname      = __FUNCTION__;        449         auto   _fname      = __FUNCTION__;
531         auto   _write_info = [&]() {              450         auto   _write_info = [&]() {
532             std::cout << "[" << _fname << "]>  << 451             std::cerr << "[" << _fname << "]> Total initalized: " << _total_init
533                       << ", expected: " << _nu    452                       << ", expected: " << _num << ", max-parallel: " << _maxp
534                       << ", size: " << _sz <<     453                       << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
535         };                                        454         };
536         while(_total_init < _num)                 455         while(_total_init < _num)
537         {                                         456         {
538             auto _n = 2 * _num;                << 457             auto _n = _num;
539             while(--_n > 0)                       458             while(--_n > 0)
540             {                                  << 459                 m_tbb_task_group->run(_init_task);
541                 _arena->execute(               << 460             m_tbb_task_group->wait();
542                     [&]() { m_tbb_task_group-> << 
543             }                                  << 
544             _arena->execute([&]() { m_tbb_task << 
545             // don't loop infinitely but use a    461             // don't loop infinitely but use a strict condition
546             if(nitr++ > 2 * (_num + 1) && (_to    462             if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
547             {                                     463             {
548                 _write_info();                    464                 _write_info();
549                 break;                            465                 break;
550             }                                     466             }
551             // at this point we need to exit      467             // at this point we need to exit
552             if(nitr > 4 * (_ncore + 1))           468             if(nitr > 4 * (_ncore + 1))
553             {                                     469             {
554                 _write_info();                    470                 _write_info();
555                 break;                            471                 break;
556             }                                     472             }
557         }                                         473         }
558         if(get_verbose() > 3)                     474         if(get_verbose() > 3)
559             _write_info();                        475             _write_info();
560 #endif                                            476 #endif
561     }                                             477     }
562     else if(get_queue())                          478     else if(get_queue())
563     {                                             479     {
564         get_queue()->ExecuteOnAllThreads(this,    480         get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
565     }                                             481     }
566 }                                                 482 }
567                                                << 
568 //-------------------------------------------- << 
569                                                << 
570 template <typename FuncT>                      << 
571 inline void                                    << 
572 ThreadPool::execute_on_specific_threads(const  << 
573                                         FuncT& << 
574 {                                              << 
575     if(m_tbb_tp && m_tbb_task_group)           << 
576     {                                          << 
577 #if defined(PTL_USE_TBB)                       << 
578         // TBB lazily activates threads to pro << 
579         // participates in processing the task << 
580         // function to execute only on the wor << 
581         //                                     << 
582         std::set<std::thread::id> _first{};    << 
583         Mutex                     _mutex{};    << 
584         // init function which executes functi << 
585         auto _exec = [&]() {                   << 
586             int _once = 0;                     << 
587             _mutex.lock();                     << 
588             if(_first.find(std::this_thread::g << 
589             {                                  << 
590                 // we need to reset this threa << 
591                 // of the same template instan << 
592                 _once = 1;                     << 
593                 _first.insert(std::this_thread << 
594             }                                  << 
595             _mutex.unlock();                   << 
596             if(_once != 0)                     << 
597             {                                  << 
598                 _func();                       << 
599                 return 1;                      << 
600             }                                  << 
601             return 0;                          << 
602         };                                     << 
603         // this will collect the number of thr << 
604         // executed the _exec function above   << 
605         std::atomic<size_t> _total_exec{ 0 };  << 
606         // number of cores                     << 
607         size_t _ncore = GetNumberOfCores();    << 
608         // maximum depth for recursion         << 
609         size_t _dmax = std::max<size_t>(_ncore << 
610         // how many threads we need to initial << 
611         size_t _num = _tids.size();            << 
612         // create a task arena                 << 
613         auto* _arena = get_task_arena();       << 
614         // this is the task passed to the task << 
615         std::function<void()> _exec_task;      << 
616         _exec_task = [&]() {                   << 
617             add_thread_id();                   << 
618             static thread_local size_type _dep << 
619             int                           _ret << 
620             auto                          _thi << 
621             // don't let the main thread execu << 
622             if(_tids.count(_this_tid) > 0)     << 
623             {                                  << 
624                 // execute the function        << 
625                 _ret = _exec();                << 
626                 // add the result              << 
627                 _total_exec += _ret;           << 
628             }                                  << 
629             // if the function did not return  << 
630             // two more tasks                  << 
631             ++_depth;                          << 
632             if(_ret == 0 && _depth < _dmax &&  << 
633             {                                  << 
634                 tbb::task_group tg{};          << 
635                 tg.run([&]() { _exec_task(); } << 
636                 tg.run([&]() { _exec_task(); } << 
637                 ThisThread::sleep_for(std::chr << 
638                 tg.wait();                     << 
639             }                                  << 
640             --_depth;                          << 
641         };                                     << 
642                                                << 
643         // TBB won't oversubscribe so we need  << 
644         size_t nitr        = 0;                << 
645         auto   _fname      = __FUNCTION__;     << 
646         auto   _write_info = [&]() {           << 
647             std::cout << "[" << _fname << "]>  << 
648                       << ", expected: " << _nu << 
649         };                                     << 
650         while(_total_exec < _num)              << 
651         {                                      << 
652             auto _n = 2 * _num;                << 
653             while(--_n > 0)                    << 
654             {                                  << 
655                 _arena->execute(               << 
656                     [&]() { m_tbb_task_group-> << 
657             }                                  << 
658             _arena->execute([&]() { m_tbb_task << 
659             // don't loop infinitely but use a << 
660             if(nitr++ > 2 * (_num + 1) && (_to << 
661             {                                  << 
662                 _write_info();                 << 
663                 break;                         << 
664             }                                  << 
665             // at this point we need to exit   << 
666             if(nitr > 8 * (_num + 1))          << 
667             {                                  << 
668                 _write_info();                 << 
669                 break;                         << 
670             }                                  << 
671         }                                      << 
672         if(get_verbose() > 3)                  << 
673             _write_info();                     << 
674 #endif                                         << 
675     }                                          << 
676     else if(get_queue())                       << 
677     {                                          << 
678         get_queue()->ExecuteOnSpecificThreads( << 
679     }                                          << 
680 }                                              << 
681                                                << 
682 //============================================    483 //======================================================================================//
683                                                   484 
684 }  // namespace PTL                               485 }  // namespace PTL
685                                                   486