Geant4 Cross Reference |
1 // 1 // 2 // MIT License 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentati 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, includ 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distr 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit perso 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the followin 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permiss 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the S 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 18 // 19 // ------------------------------------------- 19 // --------------------------------------------------------------- 20 // Tasking class header file 20 // Tasking class header file 21 // 21 // 22 // Class Description: 22 // Class Description: 23 // 23 // 24 // This file creates a class for an efficient 24 // This file creates a class for an efficient thread-pool that 25 // accepts work in the form of tasks. 25 // accepts work in the form of tasks. 26 // 26 // 27 // ------------------------------------------- 27 // --------------------------------------------------------------- 28 // Author: Jonathan Madsen (Feb 13th 2018) 28 // Author: Jonathan Madsen (Feb 13th 2018) 29 // ------------------------------------------- 29 // --------------------------------------------------------------- 30 30 31 #pragma once 31 #pragma once 32 32 33 #include "PTL/AutoLock.hh" 33 #include "PTL/AutoLock.hh" 34 #ifndef G4GMAKE 34 #ifndef G4GMAKE 35 #include "" 35 #include "" 36 #endif 36 #endif 37 #include "PTL/ThreadData.hh" 37 #include "PTL/ThreadData.hh" 38 #include "PTL/Threading.hh" 38 #include "PTL/Threading.hh" 39 #include "PTL/Types.hh" 39 #include "PTL/Types.hh" 40 #include "PTL/VTask.hh" 40 #include "PTL/VTask.hh" 41 #include "PTL/VUserTaskQueue.hh" 41 #include "PTL/VUserTaskQueue.hh" 42 42 43 #if defined(PTL_USE_TBB) 43 #if defined(PTL_USE_TBB) 44 # if !defined(TBB_SUPPRESS_DEPRECATED_MESSA 44 # if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES) 45 # define TBB_SUPPRESS_DEPRECATED_MESSAG 45 # define TBB_SUPPRESS_DEPRECATED_MESSAGES 1 46 # endif 46 # endif 47 # if !defined(TBB_PREVIEW_GLOBAL_CONTROL) 47 # if !defined(TBB_PREVIEW_GLOBAL_CONTROL) 48 # define TBB_PREVIEW_GLOBAL_CONTROL 1 48 # define TBB_PREVIEW_GLOBAL_CONTROL 1 49 # endif 49 # endif 50 # include <tbb/global_control.h> 50 # include <tbb/global_control.h> 51 # include <tbb/task_arena.h> 51 # include <tbb/task_arena.h> 52 # include <tbb/task_group.h> 52 # include <tbb/task_group.h> 53 #endif 53 #endif 54 54 55 #include <algorithm> 55 #include <algorithm> 56 #include <atomic> 56 #include <atomic> 57 #include <chrono> 57 #include <chrono> 58 #include <cstdint> 58 #include <cstdint> 59 #include <cstdlib> 59 #include <cstdlib> 60 #include <deque> 60 #include <deque> 61 #include <functional> 61 #include <functional> 62 #include <iostream> 62 #include <iostream> 63 #include <map> 63 #include <map> 64 #include <memory> 64 #include <memory> 65 #include <mutex> // IWYU pragma: keep 65 #include <mutex> // IWYU pragma: keep 66 #include <set> 66 #include <set> 67 #include <thread> 67 #include <thread> 68 #include <type_traits> // IWYU pragma: keep 68 #include <type_traits> // IWYU pragma: keep 69 #include <unordered_map> 69 #include <unordered_map> 70 #include <utility> 70 #include <utility> 71 #include <vector> 71 #include <vector> 72 72 73 namespace PTL 73 namespace PTL 74 { 74 { 75 namespace thread_pool 75 namespace thread_pool 76 { 76 { 77 namespace state 77 namespace state 78 { 78 { 79 static const short STARTED = 0; 79 static const short STARTED = 0; 80 static const short PARTIAL = 1; 80 static const short PARTIAL = 1; 81 static const short STOPPED = 2; 81 static const short STOPPED = 2; 82 static const short NONINIT = 3; 82 static const short NONINIT = 3; 83 83 84 } // namespace state 84 } // namespace state 85 } // namespace thread_pool 85 } // namespace thread_pool 86 86 87 class ThreadPool 87 class ThreadPool 88 { 88 { 89 public: 89 public: 90 template <typename KeyT, typename MappedT, 90 template <typename KeyT, typename MappedT, typename HashT = KeyT> 91 using uomap = std::unordered_map<KeyT, Map 91 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>; 92 92 93 // pod-types 93 // pod-types 94 using size_type = size_t; 94 using size_type = size_t; 95 using task_count_type = std::shared_ptr<s 95 using task_count_type = std::shared_ptr<std::atomic_uintmax_t>; 96 using atomic_int_type = std::shared_ptr<s 96 using atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>; 97 using pool_state_type = std::shared_ptr<s 97 using pool_state_type = std::shared_ptr<std::atomic_short>; 98 using atomic_bool_type = std::shared_ptr<s 98 using atomic_bool_type = std::shared_ptr<std::atomic_bool>; 99 // objects 99 // objects 100 using task_type = VTask; 100 using task_type = VTask; 101 using lock_t = std::shared_ptr<Mutex 101 using lock_t = std::shared_ptr<Mutex>; 102 using condition_t = std::shared_ptr<Condi 102 using condition_t = std::shared_ptr<Condition>; 103 using task_pointer = std::shared_ptr<task_ 103 using task_pointer = std::shared_ptr<task_type>; 104 using task_queue_t = VUserTaskQueue; 104 using task_queue_t = VUserTaskQueue; 105 // containers 105 // containers 106 using thread_list_t = std::deque<Thre 106 using thread_list_t = std::deque<ThreadId>; 107 using bool_list_t = std::vector<boo 107 using bool_list_t = std::vector<bool>; 108 using thread_id_map_t = std::map<Thread 108 using thread_id_map_t = std::map<ThreadId, uintmax_t>; 109 using thread_index_map_t = std::map<uintma 109 using thread_index_map_t = std::map<uintmax_t, ThreadId>; 110 using thread_vec_t = std::vector<Thr 110 using thread_vec_t = std::vector<Thread>; 111 using thread_data_t = std::vector<std 111 using thread_data_t = std::vector<std::shared_ptr<ThreadData>>; 112 // functions 112 // functions 113 using initialize_func_t = std::function<vo 113 using initialize_func_t = std::function<void()>; 114 using finalize_func_t = std::function<vo 114 using finalize_func_t = std::function<void()>; 115 using affinity_func_t = std::function<in 115 using affinity_func_t = std::function<intmax_t(intmax_t)>; 116 116 117 static affinity_func_t& affinity_functor() 117 static affinity_func_t& affinity_functor() 118 { 118 { 119 static affinity_func_t _v = [](intmax_ 119 static affinity_func_t _v = [](intmax_t) { 120 static std::atomic<intmax_t> assig 120 static std::atomic<intmax_t> assigned; 121 intmax_t _assi 121 intmax_t _assign = assigned++; 122 return _assign % Thread::hardware_ 122 return _assign % Thread::hardware_concurrency(); 123 }; 123 }; 124 return _v; 124 return _v; 125 } 125 } 126 126 127 static initialize_func_t& initialization_f 127 static initialize_func_t& initialization_functor() 128 { 128 { 129 static initialize_func_t _v = []() {}; 129 static initialize_func_t _v = []() {}; 130 return _v; 130 return _v; 131 } 131 } 132 132 133 static finalize_func_t& finalization_funct 133 static finalize_func_t& finalization_functor() 134 { 134 { 135 static finalize_func_t _v = []() {}; 135 static finalize_func_t _v = []() {}; 136 return _v; 136 return _v; 137 } 137 } 138 138 139 struct Config 139 struct Config 140 { 140 { >> 141 PTL_DEFAULT_OBJECT(Config) >> 142 >> 143 Config(bool, bool, bool, int, int, size_type, VUserTaskQueue*, affinity_func_t, >> 144 initialize_func_t, finalize_func_t); >> 145 141 bool init = true; 146 bool init = true; 142 bool use_tbb = false << 147 bool use_tbb = f_use_tbb(); 143 bool use_affinity = false << 148 bool use_affinity = f_use_cpu_affinity(); 144 int verbose = 0; << 149 int verbose = f_verbose(); 145 int priority = 0; << 150 int priority = f_thread_priority(); 146 size_type pool_size = f_def 151 size_type pool_size = f_default_pool_size(); 147 VUserTaskQueue* task_queue = nullp 152 VUserTaskQueue* task_queue = nullptr; 148 affinity_func_t set_affinity = affin 153 affinity_func_t set_affinity = affinity_functor(); 149 initialize_func_t initializer = initi 154 initialize_func_t initializer = initialization_functor(); 150 finalize_func_t finalizer = final 155 finalize_func_t finalizer = finalization_functor(); 151 }; 156 }; 152 157 153 public: 158 public: 154 // Constructor and Destructors 159 // Constructor and Destructors 155 explicit ThreadPool(const Config&); 160 explicit ThreadPool(const Config&); 156 ~ThreadPool(); << 161 ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue = nullptr, >> 162 bool _use_affinity = f_use_cpu_affinity(), >> 163 affinity_func_t = affinity_functor(), >> 164 initialize_func_t = initialization_functor(), >> 165 finalize_func_t = finalization_functor()); >> 166 ThreadPool(const size_type& pool_size, initialize_func_t, finalize_func_t, >> 167 bool _use_affinity = f_use_cpu_affinity(), >> 168 affinity_func_t = affinity_functor(), >> 169 VUserTaskQueue* task_queue = nullptr); >> 170 virtual ~ThreadPool(); 157 ThreadPool(const ThreadPool&) = delete; 171 ThreadPool(const ThreadPool&) = delete; 158 ThreadPool(ThreadPool&&) = default; 172 ThreadPool(ThreadPool&&) = default; 159 ThreadPool& operator=(const ThreadPool&) = 173 ThreadPool& operator=(const ThreadPool&) = delete; 160 ThreadPool& operator=(ThreadPool&&) = defa 174 ThreadPool& operator=(ThreadPool&&) = default; 161 175 162 public: 176 public: 163 // Public functions 177 // Public functions 164 size_type initialize_threadpool(size_type) 178 size_type initialize_threadpool(size_type); // start the threads 165 size_type destroy_threadpool(); 179 size_type destroy_threadpool(); // destroy the threads 166 size_type stop_thread(); 180 size_type stop_thread(); 167 181 168 template <typename FuncT> 182 template <typename FuncT> 169 void execute_on_all_threads(FuncT&& _func) 183 void execute_on_all_threads(FuncT&& _func); 170 184 171 template <typename FuncT> 185 template <typename FuncT> 172 void execute_on_specific_threads(const std 186 void execute_on_specific_threads(const std::set<std::thread::id>& _tid, 173 FuncT&& 187 FuncT&& _func); 174 188 175 task_queue_t* get_queue() const { return 189 task_queue_t* get_queue() const { return m_task_queue; } 176 task_queue_t*& get_valid_queue(task_queue_ 190 task_queue_t*& get_valid_queue(task_queue_t*&) const; 177 191 178 bool is_tbb_threadpool() const { return m_ 192 bool is_tbb_threadpool() const { return m_tbb_tp; } 179 193 180 public: 194 public: >> 195 // Public functions related to TBB >> 196 static bool using_tbb(); >> 197 // enable using TBB if available - semi-deprecated >> 198 static void set_use_tbb(bool _v); >> 199 >> 200 /// set the default use of tbb >> 201 static void set_default_use_tbb(bool _v) { set_use_tbb(_v); } >> 202 /// set the default use of cpu affinity >> 203 static void set_default_use_cpu_affinity(bool _v); >> 204 /// set the default scheduling priority of threads in thread-pool >> 205 static void set_default_scheduling_priority(int _v) { f_thread_priority() = _v; } >> 206 /// set the default verbosity >> 207 static void set_default_verbose(int _v) { f_verbose() = _v; } 181 /// set the default pool size 208 /// set the default pool size 182 static void set_default_size(size_type _v) 209 static void set_default_size(size_type _v) { f_default_pool_size() = _v; } 183 210 >> 211 /// get the default use of tbb >> 212 static bool get_default_use_tbb() { return f_use_tbb(); } >> 213 /// get the default use of cpu affinity >> 214 static bool get_default_use_cpu_affinity() { return f_use_cpu_affinity(); } >> 215 /// get the default scheduling priority of threads in thread-pool >> 216 static int get_default_scheduling_priority() { return f_thread_priority(); } >> 217 /// get the default verbosity >> 218 static int get_default_verbose() { return f_verbose(); } 184 /// get the default pool size 219 /// get the default pool size 185 static size_type get_default_size() { retu 220 static size_type get_default_size() { return f_default_pool_size(); } 186 221 187 public: 222 public: 188 // add tasks for threads to process 223 // add tasks for threads to process 189 size_type add_task(task_pointer&& task, in 224 size_type add_task(task_pointer&& task, int bin = -1); 190 // size_type add_thread_task(ThreadId id, 225 // size_type add_thread_task(ThreadId id, task_pointer&& task); 191 // add a generic container with iterator 226 // add a generic container with iterator 192 template <typename ListT> 227 template <typename ListT> 193 size_type add_tasks(ListT&); 228 size_type add_tasks(ListT&); 194 229 195 Thread* get_thread(size_type _n) const; 230 Thread* get_thread(size_type _n) const; 196 Thread* get_thread(std::thread::id id) con 231 Thread* get_thread(std::thread::id id) const; 197 232 198 // only relevant when compiled with PTL_US 233 // only relevant when compiled with PTL_USE_TBB 199 static tbb_global_control_t*& tbb_global_c 234 static tbb_global_control_t*& tbb_global_control(); 200 235 201 void set_initialization(initialize_func_t 236 void set_initialization(initialize_func_t f) { m_init_func = std::move(f); } 202 void set_finalization(finalize_func_t f) { 237 void set_finalization(finalize_func_t f) { m_fini_func = std::move(f); } 203 238 204 void reset_initialization() 239 void reset_initialization() 205 { 240 { 206 m_init_func = []() {}; 241 m_init_func = []() {}; 207 } 242 } 208 void reset_finalization() 243 void reset_finalization() 209 { 244 { 210 m_fini_func = []() {}; 245 m_fini_func = []() {}; 211 } 246 } 212 247 213 public: 248 public: 214 // get the pool state 249 // get the pool state 215 const pool_state_type& state() const { ret 250 const pool_state_type& state() const { return m_pool_state; } 216 // see how many main task threads there ar 251 // see how many main task threads there are 217 size_type size() const { return m_pool_siz 252 size_type size() const { return m_pool_size; } 218 // set the thread pool size 253 // set the thread pool size 219 void resize(size_type _n); 254 void resize(size_type _n); 220 // affinity assigns threads to cores, assi 255 // affinity assigns threads to cores, assignment at constructor 221 bool using_affinity() const { return m_use 256 bool using_affinity() const { return m_use_affinity; } 222 bool is_alive() { return m_alive_flag->loa 257 bool is_alive() { return m_alive_flag->load(); } 223 void notify(); 258 void notify(); 224 void notify_all(); 259 void notify_all(); 225 void notify(size_type); 260 void notify(size_type); 226 bool is_initialized() const; 261 bool is_initialized() const; 227 int get_active_threads_count() const { re 262 int get_active_threads_count() const { return (int)m_thread_awake->load(); } 228 263 229 void set_affinity(affinity_func_t f) { m_a 264 void set_affinity(affinity_func_t f) { m_affinity_func = std::move(f); } 230 void set_affinity(intmax_t i, Thread&) con 265 void set_affinity(intmax_t i, Thread&) const; 231 void set_priority(int _prio, Thread&) cons 266 void set_priority(int _prio, Thread&) const; 232 267 233 void set_verbose(int n) { m_verbose = n; } 268 void set_verbose(int n) { m_verbose = n; } 234 int get_verbose() const { return m_verbos 269 int get_verbose() const { return m_verbose; } 235 bool is_main() const { return ThisThread:: 270 bool is_main() const { return ThisThread::get_id() == m_main_tid; } 236 271 237 tbb_task_arena_t* get_task_arena(); 272 tbb_task_arena_t* get_task_arena(); 238 273 239 public: 274 public: 240 // read FORCE_NUM_THREADS environment vari 275 // read FORCE_NUM_THREADS environment variable 241 static const thread_id_map_t& get_thread_i 276 static const thread_id_map_t& get_thread_ids(); 242 static uintmax_t get_thread_i 277 static uintmax_t get_thread_id(ThreadId); 243 static uintmax_t get_this_thr 278 static uintmax_t get_this_thread_id(); 244 static uintmax_t add_thread_i 279 static uintmax_t add_thread_id(ThreadId = ThisThread::get_id()); 245 280 246 private: << 281 protected: 247 void execute_thread(VUserTaskQueue*); // 282 void execute_thread(VUserTaskQueue*); // function thread sits in 248 int insert(task_pointer&&, int = -1); 283 int insert(task_pointer&&, int = -1); 249 int run_on_this(task_pointer&&); 284 int run_on_this(task_pointer&&); 250 285 251 private: << 286 protected: 252 // called in THREAD INIT 287 // called in THREAD INIT 253 static void start_thread(ThreadPool*, thre 288 static void start_thread(ThreadPool*, thread_data_t*, intmax_t = -1); 254 289 255 void record_entry(); 290 void record_entry(); 256 void record_exit(); 291 void record_exit(); 257 292 258 private: 293 private: 259 // Private variables 294 // Private variables 260 // random 295 // random 261 bool m_use_affinity = fal 296 bool m_use_affinity = false; 262 bool m_tbb_tp = fal 297 bool m_tbb_tp = false; 263 bool m_delete_task_queue = fal 298 bool m_delete_task_queue = false; 264 int m_verbose = 0; << 299 int m_verbose = f_verbose(); 265 int m_priority = 0; << 300 int m_priority = f_thread_priority(); 266 size_type m_pool_size = 0; 301 size_type m_pool_size = 0; 267 ThreadId m_main_tid = Thi 302 ThreadId m_main_tid = ThisThread::get_id(); 268 atomic_bool_type m_alive_flag = std 303 atomic_bool_type m_alive_flag = std::make_shared<std::atomic_bool>(false); 269 pool_state_type m_pool_state = std 304 pool_state_type m_pool_state = std::make_shared<std::atomic_short>(0); 270 atomic_int_type m_thread_awake = std 305 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>(0); 271 atomic_int_type m_thread_active = std 306 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>(0); 272 307 273 // locks 308 // locks 274 lock_t m_task_lock = std::make_shared<Mute 309 lock_t m_task_lock = std::make_shared<Mutex>(); 275 // conditions 310 // conditions 276 condition_t m_task_cond = std::make_shared 311 condition_t m_task_cond = std::make_shared<Condition>(); 277 312 278 // containers 313 // containers 279 bool_list_t m_is_joined = {}; // joi 314 bool_list_t m_is_joined = {}; // join list 280 bool_list_t m_is_stopped = {}; // let 315 bool_list_t m_is_stopped = {}; // lets thread know to stop 281 thread_list_t m_main_threads = {}; // sto 316 thread_list_t m_main_threads = {}; // storage for active threads 282 thread_list_t m_stop_threads = {}; // sto 317 thread_list_t m_stop_threads = {}; // storage for stopped threads 283 thread_vec_t m_threads = {}; 318 thread_vec_t m_threads = {}; 284 thread_data_t m_thread_data = {}; 319 thread_data_t m_thread_data = {}; 285 320 286 // task queue 321 // task queue 287 task_queue_t* m_task_queue = nullp 322 task_queue_t* m_task_queue = nullptr; 288 tbb_task_arena_t* m_tbb_task_arena = nullp 323 tbb_task_arena_t* m_tbb_task_arena = nullptr; 289 tbb_task_group_t* m_tbb_task_group = nullp 324 tbb_task_group_t* m_tbb_task_group = nullptr; 290 325 291 // functions 326 // functions 292 initialize_func_t m_init_func = initia 327 initialize_func_t m_init_func = initialization_functor(); 293 finalize_func_t m_fini_func = finali 328 finalize_func_t m_fini_func = finalization_functor(); 294 affinity_func_t m_affinity_func = affini 329 affinity_func_t m_affinity_func = affinity_functor(); 295 330 296 private: 331 private: >> 332 static bool& f_use_tbb(); >> 333 static bool& f_use_cpu_affinity(); >> 334 static int& f_thread_priority(); >> 335 static int& f_verbose(); 297 static size_type& f_default_pool_siz 336 static size_type& f_default_pool_size(); 298 static thread_id_map_t& f_thread_ids(); 337 static thread_id_map_t& f_thread_ids(); 299 }; 338 }; 300 339 301 //-------------------------------------------- 340 //--------------------------------------------------------------------------------------// 302 inline void 341 inline void 303 ThreadPool::notify() 342 ThreadPool::notify() 304 { 343 { 305 // wake up one thread that is waiting for 344 // wake up one thread that is waiting for a task to be available 306 if(m_thread_awake->load() < m_pool_size) 345 if(m_thread_awake->load() < m_pool_size) 307 { 346 { 308 AutoLock l(*m_task_lock); 347 AutoLock l(*m_task_lock); 309 m_task_cond->notify_one(); 348 m_task_cond->notify_one(); 310 } 349 } 311 } 350 } 312 //-------------------------------------------- 351 //--------------------------------------------------------------------------------------// 313 inline void 352 inline void 314 ThreadPool::notify_all() 353 ThreadPool::notify_all() 315 { 354 { 316 // wake all threads 355 // wake all threads 317 AutoLock l(*m_task_lock); 356 AutoLock l(*m_task_lock); 318 m_task_cond->notify_all(); 357 m_task_cond->notify_all(); 319 } 358 } 320 //-------------------------------------------- 359 //--------------------------------------------------------------------------------------// 321 inline void 360 inline void 322 ThreadPool::notify(size_type ntasks) 361 ThreadPool::notify(size_type ntasks) 323 { 362 { 324 if(ntasks == 0) 363 if(ntasks == 0) 325 return; 364 return; 326 365 327 // wake up as many threads that tasks just 366 // wake up as many threads that tasks just added 328 if(m_thread_awake->load() < m_pool_size) 367 if(m_thread_awake->load() < m_pool_size) 329 { 368 { 330 AutoLock l(*m_task_lock); 369 AutoLock l(*m_task_lock); 331 if(ntasks < this->size()) 370 if(ntasks < this->size()) 332 { 371 { 333 for(size_type i = 0; i < ntasks; + 372 for(size_type i = 0; i < ntasks; ++i) 334 m_task_cond->notify_one(); 373 m_task_cond->notify_one(); 335 } 374 } 336 else 375 else 337 { 376 { 338 m_task_cond->notify_all(); 377 m_task_cond->notify_all(); 339 } 378 } 340 } 379 } 341 } 380 } 342 //-------------------------------------------- 381 //--------------------------------------------------------------------------------------// 343 // local function for getting the tbb task sch 382 // local function for getting the tbb task scheduler 344 inline tbb_global_control_t*& 383 inline tbb_global_control_t*& 345 ThreadPool::tbb_global_control() 384 ThreadPool::tbb_global_control() 346 { 385 { 347 static thread_local tbb_global_control_t* 386 static thread_local tbb_global_control_t* _instance = nullptr; 348 return _instance; 387 return _instance; 349 } 388 } 350 //-------------------------------------------- 389 //--------------------------------------------------------------------------------------// 351 // task arena 390 // task arena 352 inline tbb_task_arena_t* 391 inline tbb_task_arena_t* 353 ThreadPool::get_task_arena() 392 ThreadPool::get_task_arena() 354 { 393 { 355 #if defined(PTL_USE_TBB) 394 #if defined(PTL_USE_TBB) 356 // create a task arena 395 // create a task arena 357 if(!m_tbb_task_arena) 396 if(!m_tbb_task_arena) 358 { 397 { 359 auto _sz = (tbb_global_control()) 398 auto _sz = (tbb_global_control()) 360 ? tbb_global_control()- 399 ? tbb_global_control()->active_value( 361 tbb::global_contr 400 tbb::global_control::max_allowed_parallelism) 362 : size(); 401 : size(); 363 m_tbb_task_arena = new tbb_task_arena_ 402 m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{}); 364 m_tbb_task_arena->initialize(_sz, 1); 403 m_tbb_task_arena->initialize(_sz, 1); 365 } 404 } 366 #else 405 #else 367 if(!m_tbb_task_arena) 406 if(!m_tbb_task_arena) 368 m_tbb_task_arena = new tbb_task_arena_ 407 m_tbb_task_arena = new tbb_task_arena_t{}; 369 #endif 408 #endif 370 return m_tbb_task_arena; 409 return m_tbb_task_arena; 371 } 410 } 372 //-------------------------------------------- 411 //--------------------------------------------------------------------------------------// 373 inline void 412 inline void 374 ThreadPool::resize(size_type _n) 413 ThreadPool::resize(size_type _n) 375 { 414 { 376 initialize_threadpool(_n); 415 initialize_threadpool(_n); 377 if(m_task_queue) 416 if(m_task_queue) 378 m_task_queue->resize(static_cast<intma 417 m_task_queue->resize(static_cast<intmax_t>(_n)); 379 } 418 } 380 //-------------------------------------------- 419 //--------------------------------------------------------------------------------------// 381 inline int 420 inline int 382 ThreadPool::run_on_this(task_pointer&& _task) 421 ThreadPool::run_on_this(task_pointer&& _task) 383 { 422 { 384 auto&& _func = [_task]() { (*_task)(); }; 423 auto&& _func = [_task]() { (*_task)(); }; 385 424 386 if(m_tbb_tp && m_tbb_task_group) 425 if(m_tbb_tp && m_tbb_task_group) 387 { 426 { 388 auto* _arena = get_task_arena(); 427 auto* _arena = get_task_arena(); 389 _arena->execute([this, _func]() { this 428 _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); }); 390 } 429 } 391 else 430 else 392 { 431 { 393 _func(); 432 _func(); 394 } 433 } 395 // return the number of tasks added to tas 434 // return the number of tasks added to task-list 396 return 0; 435 return 0; 397 } 436 } 398 //-------------------------------------------- 437 //--------------------------------------------------------------------------------------// 399 inline int 438 inline int 400 ThreadPool::insert(task_pointer&& task, int bi 439 ThreadPool::insert(task_pointer&& task, int bin) 401 { 440 { 402 static thread_local ThreadData* _data = Th 441 static thread_local ThreadData* _data = ThreadData::GetInstance(); 403 442 404 // pass the task to the queue 443 // pass the task to the queue 405 auto ibin = get_valid_queue(m_task_queue)- 444 auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin); 406 notify(); 445 notify(); 407 return (int)ibin; 446 return (int)ibin; 408 } 447 } 409 //-------------------------------------------- 448 //--------------------------------------------------------------------------------------// 410 inline ThreadPool::size_type 449 inline ThreadPool::size_type 411 ThreadPool::add_task(task_pointer&& task, int 450 ThreadPool::add_task(task_pointer&& task, int bin) 412 { 451 { 413 // if not native (i.e. TBB) or we haven't 452 // if not native (i.e. TBB) or we haven't built thread-pool, just execute 414 if(m_tbb_tp || !task->is_native_task() || 453 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load()) 415 return static_cast<size_type>(run_on_t 454 return static_cast<size_type>(run_on_this(std::move(task))); 416 455 417 return static_cast<size_type>(insert(std:: 456 return static_cast<size_type>(insert(std::move(task), bin)); 418 } 457 } 419 //-------------------------------------------- 458 //--------------------------------------------------------------------------------------// 420 template <typename ListT> 459 template <typename ListT> 421 inline ThreadPool::size_type 460 inline ThreadPool::size_type 422 ThreadPool::add_tasks(ListT& c) 461 ThreadPool::add_tasks(ListT& c) 423 { 462 { 424 if(!m_alive_flag) // if we haven't built 463 if(!m_alive_flag) // if we haven't built thread-pool, just execute 425 { 464 { 426 for(auto& itr : c) 465 for(auto& itr : c) 427 run(itr); 466 run(itr); 428 c.clear(); 467 c.clear(); 429 return 0; 468 return 0; 430 } 469 } 431 470 432 // TODO: put a limit on how many tasks can 471 // TODO: put a limit on how many tasks can be added at most 433 auto c_size = c.size(); 472 auto c_size = c.size(); 434 for(auto& itr : c) 473 for(auto& itr : c) 435 { 474 { 436 if(!itr->is_native_task()) 475 if(!itr->is_native_task()) 437 --c_size; 476 --c_size; 438 else 477 else 439 { 478 { 440 //++(m_task_queue); 479 //++(m_task_queue); 441 get_valid_queue(m_task_queue)->Ins 480 get_valid_queue(m_task_queue)->InsertTask(itr); 442 } 481 } 443 } 482 } 444 c.clear(); 483 c.clear(); 445 484 446 // notify sleeping threads 485 // notify sleeping threads 447 notify(c_size); 486 notify(c_size); 448 487 449 return c_size; 488 return c_size; 450 } 489 } 451 //-------------------------------------------- 490 //--------------------------------------------------------------------------------------// 452 template <typename FuncT> 491 template <typename FuncT> 453 inline void 492 inline void 454 ThreadPool::execute_on_all_threads(FuncT&& _fu 493 ThreadPool::execute_on_all_threads(FuncT&& _func) 455 { 494 { 456 if(m_tbb_tp && m_tbb_task_group) 495 if(m_tbb_tp && m_tbb_task_group) 457 { 496 { 458 #if defined(PTL_USE_TBB) 497 #if defined(PTL_USE_TBB) 459 // TBB lazily activates threads to pro 498 // TBB lazily activates threads to process tasks and the main thread 460 // participates in processing the task 499 // participates in processing the tasks so getting a specific 461 // function to execute only on the wor 500 // function to execute only on the worker threads requires some trickery 462 // 501 // 463 std::set<std::thread::id> _first{}; 502 std::set<std::thread::id> _first{}; 464 Mutex _mutex{}; 503 Mutex _mutex{}; 465 // init function which executes functi 504 // init function which executes function and returns 1 only once 466 auto _init = [&]() { 505 auto _init = [&]() { 467 int _once = 0; 506 int _once = 0; 468 _mutex.lock(); 507 _mutex.lock(); 469 if(_first.find(std::this_thread::g 508 if(_first.find(std::this_thread::get_id()) == _first.end()) 470 { 509 { 471 // we need to reset this threa 510 // we need to reset this thread-local static for multiple invocations 472 // of the same template instan 511 // of the same template instantiation 473 _once = 1; 512 _once = 1; 474 _first.insert(std::this_thread 513 _first.insert(std::this_thread::get_id()); 475 } 514 } 476 _mutex.unlock(); 515 _mutex.unlock(); 477 if(_once != 0) 516 if(_once != 0) 478 { 517 { 479 _func(); 518 _func(); 480 return 1; 519 return 1; 481 } 520 } 482 return 0; 521 return 0; 483 }; 522 }; 484 // this will collect the number of thr 523 // this will collect the number of threads which have 485 // executed the _init function above 524 // executed the _init function above 486 std::atomic<size_t> _total_init{ 0 }; 525 std::atomic<size_t> _total_init{ 0 }; 487 // max parallelism by TBB 526 // max parallelism by TBB 488 size_t _maxp = tbb_global_control()->a 527 size_t _maxp = tbb_global_control()->active_value( 489 tbb::global_control::max_allowed_p 528 tbb::global_control::max_allowed_parallelism); 490 // create a task arean 529 // create a task arean 491 auto* _arena = get_task_arena(); 530 auto* _arena = get_task_arena(); 492 // size of the thread-pool 531 // size of the thread-pool 493 size_t _sz = size(); 532 size_t _sz = size(); 494 // number of cores 533 // number of cores 495 size_t _ncore = GetNumberOfCores(); << 534 size_t _ncore = Threading::GetNumberOfCores(); 496 // maximum depth for recursion 535 // maximum depth for recursion 497 size_t _dmax = std::max<size_t>(_ncore 536 size_t _dmax = std::max<size_t>(_ncore, 8); 498 // how many threads we need to initial 537 // how many threads we need to initialize 499 size_t _num = std::min(_maxp, std::min 538 size_t _num = std::min(_maxp, std::min(_sz, _ncore)); 500 // this is the task passed to the task 539 // this is the task passed to the task-group 501 std::function<void()> _init_task; 540 std::function<void()> _init_task; 502 _init_task = [&]() { 541 _init_task = [&]() { 503 add_thread_id(); 542 add_thread_id(); 504 static thread_local size_type _dep 543 static thread_local size_type _depth = 0; 505 int _ret 544 int _ret = 0; 506 // don't let the main thread execu 545 // don't let the main thread execute the function 507 if(!is_main()) 546 if(!is_main()) 508 { 547 { 509 // execute the function 548 // execute the function 510 _ret = _init(); 549 _ret = _init(); 511 // add the result 550 // add the result 512 _total_init += _ret; 551 _total_init += _ret; 513 } 552 } 514 // if the function did not return 553 // if the function did not return anything, recursively execute 515 // two more tasks 554 // two more tasks 516 ++_depth; 555 ++_depth; 517 if(_ret == 0 && _depth < _dmax && 556 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num) 518 { 557 { 519 tbb::task_group tg{}; 558 tbb::task_group tg{}; 520 tg.run([&]() { _init_task(); } 559 tg.run([&]() { _init_task(); }); 521 tg.run([&]() { _init_task(); } 560 tg.run([&]() { _init_task(); }); 522 ThisThread::sleep_for(std::chr 561 ThisThread::sleep_for(std::chrono::milliseconds{ 1 }); 523 tg.wait(); 562 tg.wait(); 524 } 563 } 525 --_depth; 564 --_depth; 526 }; 565 }; 527 566 528 // TBB won't oversubscribe so we need 567 // TBB won't oversubscribe so we need to limit by ncores - 1 529 size_t nitr = 0; 568 size_t nitr = 0; 530 auto _fname = __FUNCTION__; 569 auto _fname = __FUNCTION__; 531 auto _write_info = [&]() { 570 auto _write_info = [&]() { 532 std::cout << "[" << _fname << "]> 571 std::cout << "[" << _fname << "]> Total initialized: " << _total_init 533 << ", expected: " << _nu 572 << ", expected: " << _num << ", max-parallel: " << _maxp 534 << ", size: " << _sz << 573 << ", size: " << _sz << ", ncore: " << _ncore << std::endl; 535 }; 574 }; 536 while(_total_init < _num) 575 while(_total_init < _num) 537 { 576 { 538 auto _n = 2 * _num; 577 auto _n = 2 * _num; 539 while(--_n > 0) 578 while(--_n > 0) 540 { 579 { 541 _arena->execute( 580 _arena->execute( 542 [&]() { m_tbb_task_group-> 581 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); }); 543 } 582 } 544 _arena->execute([&]() { m_tbb_task 583 _arena->execute([&]() { m_tbb_task_group->wait(); }); 545 // don't loop infinitely but use a 584 // don't loop infinitely but use a strict condition 546 if(nitr++ > 2 * (_num + 1) && (_to 585 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num) 547 { 586 { 548 _write_info(); 587 _write_info(); 549 break; 588 break; 550 } 589 } 551 // at this point we need to exit 590 // at this point we need to exit 552 if(nitr > 4 * (_ncore + 1)) 591 if(nitr > 4 * (_ncore + 1)) 553 { 592 { 554 _write_info(); 593 _write_info(); 555 break; 594 break; 556 } 595 } 557 } 596 } 558 if(get_verbose() > 3) 597 if(get_verbose() > 3) 559 _write_info(); 598 _write_info(); 560 #endif 599 #endif 561 } 600 } 562 else if(get_queue()) 601 else if(get_queue()) 563 { 602 { 564 get_queue()->ExecuteOnAllThreads(this, 603 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func)); 565 } 604 } 566 } 605 } 567 606 568 //-------------------------------------------- 607 //--------------------------------------------------------------------------------------// 569 608 570 template <typename FuncT> 609 template <typename FuncT> 571 inline void 610 inline void 572 ThreadPool::execute_on_specific_threads(const 611 ThreadPool::execute_on_specific_threads(const std::set<std::thread::id>& _tids, 573 FuncT& 612 FuncT&& _func) 574 { 613 { 575 if(m_tbb_tp && m_tbb_task_group) 614 if(m_tbb_tp && m_tbb_task_group) 576 { 615 { 577 #if defined(PTL_USE_TBB) 616 #if defined(PTL_USE_TBB) 578 // TBB lazily activates threads to pro 617 // TBB lazily activates threads to process tasks and the main thread 579 // participates in processing the task 618 // participates in processing the tasks so getting a specific 580 // function to execute only on the wor 619 // function to execute only on the worker threads requires some trickery 581 // 620 // 582 std::set<std::thread::id> _first{}; 621 std::set<std::thread::id> _first{}; 583 Mutex _mutex{}; 622 Mutex _mutex{}; 584 // init function which executes functi 623 // init function which executes function and returns 1 only once 585 auto _exec = [&]() { 624 auto _exec = [&]() { 586 int _once = 0; 625 int _once = 0; 587 _mutex.lock(); 626 _mutex.lock(); 588 if(_first.find(std::this_thread::g 627 if(_first.find(std::this_thread::get_id()) == _first.end()) 589 { 628 { 590 // we need to reset this threa 629 // we need to reset this thread-local static for multiple invocations 591 // of the same template instan 630 // of the same template instantiation 592 _once = 1; 631 _once = 1; 593 _first.insert(std::this_thread 632 _first.insert(std::this_thread::get_id()); 594 } 633 } 595 _mutex.unlock(); 634 _mutex.unlock(); 596 if(_once != 0) 635 if(_once != 0) 597 { 636 { 598 _func(); 637 _func(); 599 return 1; 638 return 1; 600 } 639 } 601 return 0; 640 return 0; 602 }; 641 }; 603 // this will collect the number of thr 642 // this will collect the number of threads which have 604 // executed the _exec function above 643 // executed the _exec function above 605 std::atomic<size_t> _total_exec{ 0 }; 644 std::atomic<size_t> _total_exec{ 0 }; 606 // number of cores 645 // number of cores 607 size_t _ncore = GetNumberOfCores(); << 646 size_t _ncore = Threading::GetNumberOfCores(); 608 // maximum depth for recursion 647 // maximum depth for recursion 609 size_t _dmax = std::max<size_t>(_ncore 648 size_t _dmax = std::max<size_t>(_ncore, 8); 610 // how many threads we need to initial 649 // how many threads we need to initialize 611 size_t _num = _tids.size(); 650 size_t _num = _tids.size(); 612 // create a task arena 651 // create a task arena 613 auto* _arena = get_task_arena(); 652 auto* _arena = get_task_arena(); 614 // this is the task passed to the task 653 // this is the task passed to the task-group 615 std::function<void()> _exec_task; 654 std::function<void()> _exec_task; 616 _exec_task = [&]() { 655 _exec_task = [&]() { 617 add_thread_id(); 656 add_thread_id(); 618 static thread_local size_type _dep 657 static thread_local size_type _depth = 0; 619 int _ret 658 int _ret = 0; 620 auto _thi 659 auto _this_tid = std::this_thread::get_id(); 621 // don't let the main thread execu 660 // don't let the main thread execute the function 622 if(_tids.count(_this_tid) > 0) 661 if(_tids.count(_this_tid) > 0) 623 { 662 { 624 // execute the function 663 // execute the function 625 _ret = _exec(); 664 _ret = _exec(); 626 // add the result 665 // add the result 627 _total_exec += _ret; 666 _total_exec += _ret; 628 } 667 } 629 // if the function did not return 668 // if the function did not return anything, recursively execute 630 // two more tasks 669 // two more tasks 631 ++_depth; 670 ++_depth; 632 if(_ret == 0 && _depth < _dmax && 671 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num) 633 { 672 { 634 tbb::task_group tg{}; 673 tbb::task_group tg{}; 635 tg.run([&]() { _exec_task(); } 674 tg.run([&]() { _exec_task(); }); 636 tg.run([&]() { _exec_task(); } 675 tg.run([&]() { _exec_task(); }); 637 ThisThread::sleep_for(std::chr 676 ThisThread::sleep_for(std::chrono::milliseconds{ 1 }); 638 tg.wait(); 677 tg.wait(); 639 } 678 } 640 --_depth; 679 --_depth; 641 }; 680 }; 642 681 643 // TBB won't oversubscribe so we need 682 // TBB won't oversubscribe so we need to limit by ncores - 1 644 size_t nitr = 0; 683 size_t nitr = 0; 645 auto _fname = __FUNCTION__; 684 auto _fname = __FUNCTION__; 646 auto _write_info = [&]() { 685 auto _write_info = [&]() { 647 std::cout << "[" << _fname << "]> 686 std::cout << "[" << _fname << "]> Total executed: " << _total_exec 648 << ", expected: " << _nu 687 << ", expected: " << _num << ", size: " << size() << std::endl; 649 }; 688 }; 650 while(_total_exec < _num) 689 while(_total_exec < _num) 651 { 690 { 652 auto _n = 2 * _num; 691 auto _n = 2 * _num; 653 while(--_n > 0) 692 while(--_n > 0) 654 { 693 { 655 _arena->execute( 694 _arena->execute( 656 [&]() { m_tbb_task_group-> 695 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); }); 657 } 696 } 658 _arena->execute([&]() { m_tbb_task 697 _arena->execute([&]() { m_tbb_task_group->wait(); }); 659 // don't loop infinitely but use a 698 // don't loop infinitely but use a strict condition 660 if(nitr++ > 2 * (_num + 1) && (_to 699 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num) 661 { 700 { 662 _write_info(); 701 _write_info(); 663 break; 702 break; 664 } 703 } 665 // at this point we need to exit 704 // at this point we need to exit 666 if(nitr > 8 * (_num + 1)) 705 if(nitr > 8 * (_num + 1)) 667 { 706 { 668 _write_info(); 707 _write_info(); 669 break; 708 break; 670 } 709 } 671 } 710 } 672 if(get_verbose() > 3) 711 if(get_verbose() > 3) 673 _write_info(); 712 _write_info(); 674 #endif 713 #endif 675 } 714 } 676 else if(get_queue()) 715 else if(get_queue()) 677 { 716 { 678 get_queue()->ExecuteOnSpecificThreads( 717 get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func)); 679 } 718 } 680 } 719 } 681 720 682 //============================================ 721 //======================================================================================// 683 722 684 } // namespace PTL 723 } // namespace PTL 685 724