Geant4 Cross Reference |
1 // 1 // 2 // MIT License 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentati 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, includ 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distr 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit perso 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the followin 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permiss 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the S 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 18 // 19 // ------------------------------------------- 19 // --------------------------------------------------------------- 20 // Tasking class header file 20 // Tasking class header file 21 // 21 // 22 // Class Description: 22 // Class Description: 23 // 23 // 24 // This file creates a class for an efficient 24 // This file creates a class for an efficient thread-pool that 25 // accepts work in the form of tasks. 25 // accepts work in the form of tasks. 26 // 26 // 27 // ------------------------------------------- 27 // --------------------------------------------------------------- 28 // Author: Jonathan Madsen (Feb 13th 2018) 28 // Author: Jonathan Madsen (Feb 13th 2018) 29 // ------------------------------------------- 29 // --------------------------------------------------------------- 30 30 31 #pragma once 31 #pragma once 32 32 33 #include "PTL/AutoLock.hh" 33 #include "PTL/AutoLock.hh" 34 #ifndef G4GMAKE << 35 #include "" << 36 #endif << 37 #include "PTL/ThreadData.hh" 34 #include "PTL/ThreadData.hh" 38 #include "PTL/Threading.hh" 35 #include "PTL/Threading.hh" 39 #include "PTL/Types.hh" << 40 #include "PTL/VTask.hh" 36 #include "PTL/VTask.hh" 41 #include "PTL/VUserTaskQueue.hh" 37 #include "PTL/VUserTaskQueue.hh" 42 38 43 #if defined(PTL_USE_TBB) 39 #if defined(PTL_USE_TBB) 44 # if !defined(TBB_SUPPRESS_DEPRECATED_MESSA 40 # if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES) 45 # define TBB_SUPPRESS_DEPRECATED_MESSAG 41 # define TBB_SUPPRESS_DEPRECATED_MESSAGES 1 46 # endif 42 # endif 47 # if !defined(TBB_PREVIEW_GLOBAL_CONTROL) 43 # if !defined(TBB_PREVIEW_GLOBAL_CONTROL) 48 # define TBB_PREVIEW_GLOBAL_CONTROL 1 44 # define TBB_PREVIEW_GLOBAL_CONTROL 1 49 # endif 45 # endif 50 # include <tbb/global_control.h> 46 # include <tbb/global_control.h> 51 # include <tbb/task_arena.h> << 47 # include <tbb/tbb.h> 52 # include <tbb/task_group.h> << 53 #endif 48 #endif 54 49 55 #include <algorithm> << 50 // C 56 #include <atomic> << 57 #include <chrono> << 58 #include <cstdint> 51 #include <cstdint> 59 #include <cstdlib> 52 #include <cstdlib> >> 53 #include <cstring> >> 54 // C++ >> 55 #include <atomic> 60 #include <deque> 56 #include <deque> 61 #include <functional> << 62 #include <iostream> 57 #include <iostream> 63 #include <map> 58 #include <map> 64 #include <memory> 59 #include <memory> 65 #include <mutex> // IWYU pragma: keep << 60 #include <queue> 66 #include <set> 61 #include <set> 67 #include <thread> << 62 #include <stack> 68 #include <type_traits> // IWYU pragma: keep << 69 #include <unordered_map> 63 #include <unordered_map> 70 #include <utility> << 71 #include <vector> 64 #include <vector> 72 65 73 namespace PTL 66 namespace PTL 74 { 67 { 75 namespace thread_pool << 76 { << 77 namespace state << 78 { << 79 static const short STARTED = 0; << 80 static const short PARTIAL = 1; << 81 static const short STOPPED = 2; << 82 static const short NONINIT = 3; << 83 << 84 } // namespace state << 85 } // namespace thread_pool << 86 << 87 class ThreadPool 68 class ThreadPool 88 { 69 { 89 public: 70 public: 90 template <typename KeyT, typename MappedT, 71 template <typename KeyT, typename MappedT, typename HashT = KeyT> 91 using uomap = std::unordered_map<KeyT, Map 72 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>; 92 73 93 // pod-types 74 // pod-types 94 using size_type = size_t; 75 using size_type = size_t; 95 using task_count_type = std::shared_ptr<s 76 using task_count_type = std::shared_ptr<std::atomic_uintmax_t>; 96 using atomic_int_type = std::shared_ptr<s 77 using atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>; 97 using pool_state_type = std::shared_ptr<s 78 using pool_state_type = std::shared_ptr<std::atomic_short>; 98 using atomic_bool_type = std::shared_ptr<s 79 using atomic_bool_type = std::shared_ptr<std::atomic_bool>; 99 // objects 80 // objects 100 using task_type = VTask; 81 using task_type = VTask; 101 using lock_t = std::shared_ptr<Mutex 82 using lock_t = std::shared_ptr<Mutex>; 102 using condition_t = std::shared_ptr<Condi 83 using condition_t = std::shared_ptr<Condition>; 103 using task_pointer = std::shared_ptr<task_ 84 using task_pointer = std::shared_ptr<task_type>; 104 using task_queue_t = VUserTaskQueue; 85 using task_queue_t = VUserTaskQueue; 105 // containers 86 // containers 106 using thread_list_t = std::deque<Thre << 87 typedef std::deque<ThreadId> thread_list_t; 107 using bool_list_t = std::vector<boo << 88 typedef std::vector<bool> bool_list_t; 108 using thread_id_map_t = std::map<Thread << 89 typedef std::map<ThreadId, uintmax_t> thread_id_map_t; 109 using thread_index_map_t = std::map<uintma << 90 typedef std::map<uintmax_t, ThreadId> thread_index_map_t; 110 using thread_vec_t = std::vector<Thr << 91 using thread_vec_t = std::vector<Thread>; 111 using thread_data_t = std::vector<std << 92 using thread_data_t = std::vector<std::shared_ptr<ThreadData>>; 112 // functions 93 // functions 113 using initialize_func_t = std::function<vo << 94 typedef std::function<void()> initialize_func_t; 114 using finalize_func_t = std::function<vo << 95 typedef std::function<intmax_t(intmax_t)> affinity_func_t; 115 using affinity_func_t = std::function<in << 116 << 117 static affinity_func_t& affinity_functor() << 118 { << 119 static affinity_func_t _v = [](intmax_ << 120 static std::atomic<intmax_t> assig << 121 intmax_t _assi << 122 return _assign % Thread::hardware_ << 123 }; << 124 return _v; << 125 } << 126 << 127 static initialize_func_t& initialization_f << 128 { << 129 static initialize_func_t _v = []() {}; << 130 return _v; << 131 } << 132 << 133 static finalize_func_t& finalization_funct << 134 { << 135 static finalize_func_t _v = []() {}; << 136 return _v; << 137 } << 138 << 139 struct Config << 140 { << 141 bool init = true; << 142 bool use_tbb = false << 143 bool use_affinity = false << 144 int verbose = 0; << 145 int priority = 0; << 146 size_type pool_size = f_def << 147 VUserTaskQueue* task_queue = nullp << 148 affinity_func_t set_affinity = affin << 149 initialize_func_t initializer = initi << 150 finalize_func_t finalizer = final << 151 }; << 152 96 153 public: 97 public: 154 // Constructor and Destructors 98 // Constructor and Destructors 155 explicit ThreadPool(const Config&); << 99 ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue = nullptr, 156 ~ThreadPool(); << 100 bool _use_affinity = GetEnv<bool>("PTL_CPU_AFFINITY", false), >> 101 affinity_func_t = [](intmax_t) { >> 102 static std::atomic<intmax_t> assigned; >> 103 intmax_t _assign = assigned++; >> 104 return _assign % Thread::hardware_concurrency(); >> 105 }); >> 106 // Virtual destructors are required by abstract classes >> 107 // so add it by default, just in case >> 108 virtual ~ThreadPool(); 157 ThreadPool(const ThreadPool&) = delete; 109 ThreadPool(const ThreadPool&) = delete; 158 ThreadPool(ThreadPool&&) = default; 110 ThreadPool(ThreadPool&&) = default; 159 ThreadPool& operator=(const ThreadPool&) = 111 ThreadPool& operator=(const ThreadPool&) = delete; 160 ThreadPool& operator=(ThreadPool&&) = defa 112 ThreadPool& operator=(ThreadPool&&) = default; 161 113 162 public: 114 public: 163 // Public functions 115 // Public functions 164 size_type initialize_threadpool(size_type) 116 size_type initialize_threadpool(size_type); // start the threads 165 size_type destroy_threadpool(); 117 size_type destroy_threadpool(); // destroy the threads 166 size_type stop_thread(); 118 size_type stop_thread(); 167 119 168 template <typename FuncT> 120 template <typename FuncT> 169 void execute_on_all_threads(FuncT&& _func) 121 void execute_on_all_threads(FuncT&& _func); 170 122 171 template <typename FuncT> 123 template <typename FuncT> 172 void execute_on_specific_threads(const std 124 void execute_on_specific_threads(const std::set<std::thread::id>& _tid, 173 FuncT&& 125 FuncT&& _func); 174 126 175 task_queue_t* get_queue() const { return 127 task_queue_t* get_queue() const { return m_task_queue; } 176 task_queue_t*& get_valid_queue(task_queue_ 128 task_queue_t*& get_valid_queue(task_queue_t*&) const; 177 129 178 bool is_tbb_threadpool() const { return m_ 130 bool is_tbb_threadpool() const { return m_tbb_tp; } 179 131 180 public: 132 public: 181 /// set the default pool size << 133 // Public functions related to TBB 182 static void set_default_size(size_type _v) << 134 static bool using_tbb(); 183 << 135 // enable using TBB if available 184 /// get the default pool size << 136 static void set_use_tbb(bool val); 185 static size_type get_default_size() { retu << 186 137 187 public: 138 public: 188 // add tasks for threads to process 139 // add tasks for threads to process 189 size_type add_task(task_pointer&& task, in 140 size_type add_task(task_pointer&& task, int bin = -1); 190 // size_type add_thread_task(ThreadId id, 141 // size_type add_thread_task(ThreadId id, task_pointer&& task); 191 // add a generic container with iterator 142 // add a generic container with iterator 192 template <typename ListT> 143 template <typename ListT> 193 size_type add_tasks(ListT&); 144 size_type add_tasks(ListT&); 194 145 195 Thread* get_thread(size_type _n) const; 146 Thread* get_thread(size_type _n) const; 196 Thread* get_thread(std::thread::id id) con 147 Thread* get_thread(std::thread::id id) const; 197 148 198 // only relevant when compiled with PTL_US 149 // only relevant when compiled with PTL_USE_TBB 199 static tbb_global_control_t*& tbb_global_c 150 static tbb_global_control_t*& tbb_global_control(); 200 151 201 void set_initialization(initialize_func_t << 152 void set_initialization(initialize_func_t f) { m_init_func = f; } 202 void set_finalization(finalize_func_t f) { << 203 << 204 void reset_initialization() 153 void reset_initialization() 205 { 154 { 206 m_init_func = []() {}; << 155 auto f = []() {}; 207 } << 156 m_init_func = f; 208 void reset_finalization() << 209 { << 210 m_fini_func = []() {}; << 211 } 157 } 212 158 213 public: 159 public: 214 // get the pool state 160 // get the pool state 215 const pool_state_type& state() const { ret 161 const pool_state_type& state() const { return m_pool_state; } 216 // see how many main task threads there ar 162 // see how many main task threads there are 217 size_type size() const { return m_pool_siz 163 size_type size() const { return m_pool_size; } 218 // set the thread pool size 164 // set the thread pool size 219 void resize(size_type _n); 165 void resize(size_type _n); 220 // affinity assigns threads to cores, assi 166 // affinity assigns threads to cores, assignment at constructor 221 bool using_affinity() const { return m_use 167 bool using_affinity() const { return m_use_affinity; } 222 bool is_alive() { return m_alive_flag->loa 168 bool is_alive() { return m_alive_flag->load(); } 223 void notify(); 169 void notify(); 224 void notify_all(); 170 void notify_all(); 225 void notify(size_type); 171 void notify(size_type); 226 bool is_initialized() const; 172 bool is_initialized() const; 227 int get_active_threads_count() const { re << 173 int get_active_threads_count() const >> 174 { >> 175 return (m_thread_awake) ? m_thread_awake->load() : 0; >> 176 } 228 177 229 void set_affinity(affinity_func_t f) { m_a << 178 void set_affinity(affinity_func_t f) { m_affinity_func = f; } 230 void set_affinity(intmax_t i, Thread&) con << 179 void set_affinity(intmax_t i, Thread&); 231 void set_priority(int _prio, Thread&) cons << 232 180 233 void set_verbose(int n) { m_verbose = n; } 181 void set_verbose(int n) { m_verbose = n; } 234 int get_verbose() const { return m_verbos 182 int get_verbose() const { return m_verbose; } 235 bool is_main() const { return ThisThread:: 183 bool is_main() const { return ThisThread::get_id() == m_main_tid; } 236 184 237 tbb_task_arena_t* get_task_arena(); 185 tbb_task_arena_t* get_task_arena(); 238 186 239 public: 187 public: 240 // read FORCE_NUM_THREADS environment vari 188 // read FORCE_NUM_THREADS environment variable 241 static const thread_id_map_t& get_thread_i 189 static const thread_id_map_t& get_thread_ids(); 242 static uintmax_t get_thread_i << 243 static uintmax_t get_this_thr 190 static uintmax_t get_this_thread_id(); 244 static uintmax_t add_thread_i << 191 static uintmax_t add_thread_id() >> 192 { >> 193 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock); >> 194 if(!lock.owns_lock()) >> 195 lock.lock(); >> 196 auto _tid = ThisThread::get_id(); >> 197 if(f_thread_ids.find(_tid) == f_thread_ids.end()) >> 198 { >> 199 auto _idx = f_thread_ids.size(); >> 200 f_thread_ids[_tid] = _idx; >> 201 Threading::SetThreadId(_idx); >> 202 } >> 203 return f_thread_ids.at(_tid); >> 204 } 245 205 246 private: << 206 protected: 247 void execute_thread(VUserTaskQueue*); // 207 void execute_thread(VUserTaskQueue*); // function thread sits in 248 int insert(task_pointer&&, int = -1); 208 int insert(task_pointer&&, int = -1); 249 int run_on_this(task_pointer&&); 209 int run_on_this(task_pointer&&); 250 210 251 private: << 211 protected: 252 // called in THREAD INIT 212 // called in THREAD INIT 253 static void start_thread(ThreadPool*, thre 213 static void start_thread(ThreadPool*, thread_data_t*, intmax_t = -1); 254 214 255 void record_entry(); << 215 void record_entry() 256 void record_exit(); << 216 { >> 217 if(m_thread_active) >> 218 ++(*m_thread_active); >> 219 } >> 220 >> 221 void record_exit() >> 222 { >> 223 if(m_thread_active) >> 224 --(*m_thread_active); >> 225 } 257 226 258 private: 227 private: 259 // Private variables 228 // Private variables 260 // random 229 // random 261 bool m_use_affinity = fal 230 bool m_use_affinity = false; 262 bool m_tbb_tp = fal 231 bool m_tbb_tp = false; 263 bool m_delete_task_queue = fal 232 bool m_delete_task_queue = false; 264 int m_verbose = 0; << 233 int m_verbose = GetEnv<int>("PTL_VERBOSE", 0); 265 int m_priority = 0; << 266 size_type m_pool_size = 0; 234 size_type m_pool_size = 0; 267 ThreadId m_main_tid = Thi 235 ThreadId m_main_tid = ThisThread::get_id(); 268 atomic_bool_type m_alive_flag = std 236 atomic_bool_type m_alive_flag = std::make_shared<std::atomic_bool>(false); 269 pool_state_type m_pool_state = std 237 pool_state_type m_pool_state = std::make_shared<std::atomic_short>(0); 270 atomic_int_type m_thread_awake = std << 238 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>(); 271 atomic_int_type m_thread_active = std << 239 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>(); 272 240 273 // locks 241 // locks 274 lock_t m_task_lock = std::make_shared<Mute 242 lock_t m_task_lock = std::make_shared<Mutex>(); 275 // conditions 243 // conditions 276 condition_t m_task_cond = std::make_shared 244 condition_t m_task_cond = std::make_shared<Condition>(); 277 245 278 // containers 246 // containers 279 bool_list_t m_is_joined = {}; // joi << 247 bool_list_t m_is_joined{}; // join list 280 bool_list_t m_is_stopped = {}; // let << 248 bool_list_t m_is_stopped{}; // lets thread know to stop 281 thread_list_t m_main_threads = {}; // sto << 249 thread_list_t m_main_threads{}; // storage for active threads 282 thread_list_t m_stop_threads = {}; // sto << 250 thread_list_t m_stop_threads{}; // storage for stopped threads 283 thread_vec_t m_threads = {}; << 251 thread_vec_t m_threads{}; 284 thread_data_t m_thread_data = {}; << 252 thread_data_t m_thread_data{}; 285 253 286 // task queue 254 // task queue 287 task_queue_t* m_task_queue = nullp 255 task_queue_t* m_task_queue = nullptr; 288 tbb_task_arena_t* m_tbb_task_arena = nullp 256 tbb_task_arena_t* m_tbb_task_arena = nullptr; 289 tbb_task_group_t* m_tbb_task_group = nullp 257 tbb_task_group_t* m_tbb_task_group = nullptr; 290 258 291 // functions 259 // functions 292 initialize_func_t m_init_func = initia << 260 initialize_func_t m_init_func = []() {}; 293 finalize_func_t m_fini_func = finali << 261 affinity_func_t m_affinity_func; 294 affinity_func_t m_affinity_func = affini << 295 262 296 private: 263 private: 297 static size_type& f_default_pool_siz << 264 // Private static variables 298 static thread_id_map_t& f_thread_ids(); << 265 PTL_DLL static thread_id_map_t f_thread_ids; >> 266 PTL_DLL static bool f_use_tbb; 299 }; 267 }; 300 268 301 //-------------------------------------------- 269 //--------------------------------------------------------------------------------------// 302 inline void 270 inline void 303 ThreadPool::notify() 271 ThreadPool::notify() 304 { 272 { 305 // wake up one thread that is waiting for 273 // wake up one thread that is waiting for a task to be available 306 if(m_thread_awake->load() < m_pool_size) << 274 if(m_thread_awake && m_thread_awake->load() < m_pool_size) 307 { 275 { 308 AutoLock l(*m_task_lock); 276 AutoLock l(*m_task_lock); 309 m_task_cond->notify_one(); 277 m_task_cond->notify_one(); 310 } 278 } 311 } 279 } 312 //-------------------------------------------- 280 //--------------------------------------------------------------------------------------// 313 inline void 281 inline void 314 ThreadPool::notify_all() 282 ThreadPool::notify_all() 315 { 283 { 316 // wake all threads 284 // wake all threads 317 AutoLock l(*m_task_lock); 285 AutoLock l(*m_task_lock); 318 m_task_cond->notify_all(); 286 m_task_cond->notify_all(); 319 } 287 } 320 //-------------------------------------------- 288 //--------------------------------------------------------------------------------------// 321 inline void 289 inline void 322 ThreadPool::notify(size_type ntasks) 290 ThreadPool::notify(size_type ntasks) 323 { 291 { 324 if(ntasks == 0) 292 if(ntasks == 0) 325 return; 293 return; 326 294 327 // wake up as many threads that tasks just 295 // wake up as many threads that tasks just added 328 if(m_thread_awake->load() < m_pool_size) << 296 if(m_thread_awake && m_thread_awake->load() < m_pool_size) 329 { 297 { 330 AutoLock l(*m_task_lock); 298 AutoLock l(*m_task_lock); 331 if(ntasks < this->size()) 299 if(ntasks < this->size()) 332 { 300 { 333 for(size_type i = 0; i < ntasks; + 301 for(size_type i = 0; i < ntasks; ++i) 334 m_task_cond->notify_one(); 302 m_task_cond->notify_one(); 335 } 303 } 336 else 304 else 337 { 305 { 338 m_task_cond->notify_all(); 306 m_task_cond->notify_all(); 339 } 307 } 340 } 308 } 341 } 309 } 342 //-------------------------------------------- 310 //--------------------------------------------------------------------------------------// 343 // local function for getting the tbb task sch 311 // local function for getting the tbb task scheduler 344 inline tbb_global_control_t*& 312 inline tbb_global_control_t*& 345 ThreadPool::tbb_global_control() 313 ThreadPool::tbb_global_control() 346 { 314 { 347 static thread_local tbb_global_control_t* 315 static thread_local tbb_global_control_t* _instance = nullptr; 348 return _instance; 316 return _instance; 349 } 317 } 350 //-------------------------------------------- 318 //--------------------------------------------------------------------------------------// 351 // task arena 319 // task arena 352 inline tbb_task_arena_t* 320 inline tbb_task_arena_t* 353 ThreadPool::get_task_arena() 321 ThreadPool::get_task_arena() 354 { 322 { 355 #if defined(PTL_USE_TBB) 323 #if defined(PTL_USE_TBB) 356 // create a task arena 324 // create a task arena 357 if(!m_tbb_task_arena) 325 if(!m_tbb_task_arena) 358 { 326 { 359 auto _sz = (tbb_global_control()) 327 auto _sz = (tbb_global_control()) 360 ? tbb_global_control()- 328 ? tbb_global_control()->active_value( 361 tbb::global_contr 329 tbb::global_control::max_allowed_parallelism) 362 : size(); 330 : size(); 363 m_tbb_task_arena = new tbb_task_arena_ 331 m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{}); 364 m_tbb_task_arena->initialize(_sz, 1); 332 m_tbb_task_arena->initialize(_sz, 1); 365 } 333 } 366 #else 334 #else 367 if(!m_tbb_task_arena) 335 if(!m_tbb_task_arena) 368 m_tbb_task_arena = new tbb_task_arena_ 336 m_tbb_task_arena = new tbb_task_arena_t{}; 369 #endif 337 #endif 370 return m_tbb_task_arena; 338 return m_tbb_task_arena; 371 } 339 } 372 //-------------------------------------------- 340 //--------------------------------------------------------------------------------------// 373 inline void 341 inline void 374 ThreadPool::resize(size_type _n) 342 ThreadPool::resize(size_type _n) 375 { 343 { >> 344 if(_n == m_pool_size) >> 345 return; 376 initialize_threadpool(_n); 346 initialize_threadpool(_n); 377 if(m_task_queue) << 347 m_task_queue->resize(static_cast<intmax_t>(_n)); 378 m_task_queue->resize(static_cast<intma << 379 } 348 } 380 //-------------------------------------------- 349 //--------------------------------------------------------------------------------------// 381 inline int 350 inline int 382 ThreadPool::run_on_this(task_pointer&& _task) 351 ThreadPool::run_on_this(task_pointer&& _task) 383 { 352 { 384 auto&& _func = [_task]() { (*_task)(); }; 353 auto&& _func = [_task]() { (*_task)(); }; 385 354 386 if(m_tbb_tp && m_tbb_task_group) 355 if(m_tbb_tp && m_tbb_task_group) 387 { 356 { 388 auto* _arena = get_task_arena(); << 357 auto _arena = get_task_arena(); 389 _arena->execute([this, _func]() { this 358 _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); }); 390 } 359 } 391 else 360 else 392 { 361 { 393 _func(); 362 _func(); 394 } 363 } 395 // return the number of tasks added to tas 364 // return the number of tasks added to task-list 396 return 0; 365 return 0; 397 } 366 } 398 //-------------------------------------------- 367 //--------------------------------------------------------------------------------------// 399 inline int 368 inline int 400 ThreadPool::insert(task_pointer&& task, int bi 369 ThreadPool::insert(task_pointer&& task, int bin) 401 { 370 { 402 static thread_local ThreadData* _data = Th 371 static thread_local ThreadData* _data = ThreadData::GetInstance(); 403 372 404 // pass the task to the queue 373 // pass the task to the queue 405 auto ibin = get_valid_queue(m_task_queue)- 374 auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin); 406 notify(); 375 notify(); 407 return (int)ibin; << 376 return ibin; 408 } 377 } 409 //-------------------------------------------- 378 //--------------------------------------------------------------------------------------// 410 inline ThreadPool::size_type 379 inline ThreadPool::size_type 411 ThreadPool::add_task(task_pointer&& task, int 380 ThreadPool::add_task(task_pointer&& task, int bin) 412 { 381 { 413 // if not native (i.e. TBB) or we haven't 382 // if not native (i.e. TBB) or we haven't built thread-pool, just execute 414 if(m_tbb_tp || !task->is_native_task() || 383 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load()) 415 return static_cast<size_type>(run_on_t 384 return static_cast<size_type>(run_on_this(std::move(task))); 416 385 417 return static_cast<size_type>(insert(std:: 386 return static_cast<size_type>(insert(std::move(task), bin)); 418 } 387 } 419 //-------------------------------------------- 388 //--------------------------------------------------------------------------------------// 420 template <typename ListT> 389 template <typename ListT> 421 inline ThreadPool::size_type 390 inline ThreadPool::size_type 422 ThreadPool::add_tasks(ListT& c) 391 ThreadPool::add_tasks(ListT& c) 423 { 392 { 424 if(!m_alive_flag) // if we haven't built 393 if(!m_alive_flag) // if we haven't built thread-pool, just execute 425 { 394 { 426 for(auto& itr : c) 395 for(auto& itr : c) 427 run(itr); 396 run(itr); 428 c.clear(); 397 c.clear(); 429 return 0; 398 return 0; 430 } 399 } 431 400 432 // TODO: put a limit on how many tasks can 401 // TODO: put a limit on how many tasks can be added at most 433 auto c_size = c.size(); 402 auto c_size = c.size(); 434 for(auto& itr : c) 403 for(auto& itr : c) 435 { 404 { 436 if(!itr->is_native_task()) 405 if(!itr->is_native_task()) 437 --c_size; 406 --c_size; 438 else 407 else 439 { 408 { 440 //++(m_task_queue); 409 //++(m_task_queue); 441 get_valid_queue(m_task_queue)->Ins 410 get_valid_queue(m_task_queue)->InsertTask(itr); 442 } 411 } 443 } 412 } 444 c.clear(); 413 c.clear(); 445 414 446 // notify sleeping threads 415 // notify sleeping threads 447 notify(c_size); 416 notify(c_size); 448 417 449 return c_size; 418 return c_size; 450 } 419 } 451 //-------------------------------------------- 420 //--------------------------------------------------------------------------------------// 452 template <typename FuncT> 421 template <typename FuncT> 453 inline void 422 inline void 454 ThreadPool::execute_on_all_threads(FuncT&& _fu 423 ThreadPool::execute_on_all_threads(FuncT&& _func) 455 { 424 { 456 if(m_tbb_tp && m_tbb_task_group) 425 if(m_tbb_tp && m_tbb_task_group) 457 { 426 { 458 #if defined(PTL_USE_TBB) 427 #if defined(PTL_USE_TBB) 459 // TBB lazily activates threads to pro 428 // TBB lazily activates threads to process tasks and the main thread 460 // participates in processing the task 429 // participates in processing the tasks so getting a specific 461 // function to execute only on the wor 430 // function to execute only on the worker threads requires some trickery 462 // 431 // 463 std::set<std::thread::id> _first{}; 432 std::set<std::thread::id> _first{}; 464 Mutex _mutex{}; 433 Mutex _mutex{}; 465 // init function which executes functi 434 // init function which executes function and returns 1 only once 466 auto _init = [&]() { 435 auto _init = [&]() { 467 int _once = 0; 436 int _once = 0; 468 _mutex.lock(); 437 _mutex.lock(); 469 if(_first.find(std::this_thread::g 438 if(_first.find(std::this_thread::get_id()) == _first.end()) 470 { 439 { 471 // we need to reset this threa 440 // we need to reset this thread-local static for multiple invocations 472 // of the same template instan 441 // of the same template instantiation 473 _once = 1; 442 _once = 1; 474 _first.insert(std::this_thread 443 _first.insert(std::this_thread::get_id()); 475 } 444 } 476 _mutex.unlock(); 445 _mutex.unlock(); 477 if(_once != 0) 446 if(_once != 0) 478 { 447 { 479 _func(); 448 _func(); 480 return 1; 449 return 1; 481 } 450 } 482 return 0; 451 return 0; 483 }; 452 }; 484 // this will collect the number of thr 453 // this will collect the number of threads which have 485 // executed the _init function above 454 // executed the _init function above 486 std::atomic<size_t> _total_init{ 0 }; 455 std::atomic<size_t> _total_init{ 0 }; 487 // max parallelism by TBB 456 // max parallelism by TBB 488 size_t _maxp = tbb_global_control()->a 457 size_t _maxp = tbb_global_control()->active_value( 489 tbb::global_control::max_allowed_p 458 tbb::global_control::max_allowed_parallelism); 490 // create a task arean 459 // create a task arean 491 auto* _arena = get_task_arena(); << 460 auto _arena = get_task_arena(); 492 // size of the thread-pool 461 // size of the thread-pool 493 size_t _sz = size(); 462 size_t _sz = size(); 494 // number of cores 463 // number of cores 495 size_t _ncore = GetNumberOfCores(); << 464 size_t _ncore = Threading::GetNumberOfCores(); 496 // maximum depth for recursion 465 // maximum depth for recursion 497 size_t _dmax = std::max<size_t>(_ncore << 466 size_t _dmax = std::max<size_t>(_ncore, 4); 498 // how many threads we need to initial 467 // how many threads we need to initialize 499 size_t _num = std::min(_maxp, std::min 468 size_t _num = std::min(_maxp, std::min(_sz, _ncore)); 500 // this is the task passed to the task 469 // this is the task passed to the task-group 501 std::function<void()> _init_task; 470 std::function<void()> _init_task; 502 _init_task = [&]() { 471 _init_task = [&]() { 503 add_thread_id(); 472 add_thread_id(); 504 static thread_local size_type _dep 473 static thread_local size_type _depth = 0; 505 int _ret 474 int _ret = 0; 506 // don't let the main thread execu 475 // don't let the main thread execute the function 507 if(!is_main()) 476 if(!is_main()) 508 { 477 { 509 // execute the function 478 // execute the function 510 _ret = _init(); 479 _ret = _init(); 511 // add the result 480 // add the result 512 _total_init += _ret; 481 _total_init += _ret; 513 } 482 } 514 // if the function did not return 483 // if the function did not return anything, recursively execute 515 // two more tasks 484 // two more tasks 516 ++_depth; 485 ++_depth; 517 if(_ret == 0 && _depth < _dmax && 486 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num) 518 { 487 { 519 tbb::task_group tg{}; 488 tbb::task_group tg{}; 520 tg.run([&]() { _init_task(); } 489 tg.run([&]() { _init_task(); }); 521 tg.run([&]() { _init_task(); } 490 tg.run([&]() { _init_task(); }); 522 ThisThread::sleep_for(std::chr 491 ThisThread::sleep_for(std::chrono::milliseconds{ 1 }); 523 tg.wait(); 492 tg.wait(); 524 } 493 } 525 --_depth; 494 --_depth; 526 }; 495 }; 527 496 528 // TBB won't oversubscribe so we need 497 // TBB won't oversubscribe so we need to limit by ncores - 1 529 size_t nitr = 0; 498 size_t nitr = 0; 530 auto _fname = __FUNCTION__; 499 auto _fname = __FUNCTION__; 531 auto _write_info = [&]() { 500 auto _write_info = [&]() { 532 std::cout << "[" << _fname << "]> << 501 std::cout << "[" << _fname << "]> Total initalized: " << _total_init 533 << ", expected: " << _nu 502 << ", expected: " << _num << ", max-parallel: " << _maxp 534 << ", size: " << _sz << 503 << ", size: " << _sz << ", ncore: " << _ncore << std::endl; 535 }; 504 }; 536 while(_total_init < _num) 505 while(_total_init < _num) 537 { 506 { 538 auto _n = 2 * _num; 507 auto _n = 2 * _num; 539 while(--_n > 0) 508 while(--_n > 0) 540 { 509 { 541 _arena->execute( 510 _arena->execute( 542 [&]() { m_tbb_task_group-> 511 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); }); 543 } 512 } 544 _arena->execute([&]() { m_tbb_task 513 _arena->execute([&]() { m_tbb_task_group->wait(); }); 545 // don't loop infinitely but use a 514 // don't loop infinitely but use a strict condition 546 if(nitr++ > 2 * (_num + 1) && (_to 515 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num) 547 { 516 { 548 _write_info(); 517 _write_info(); 549 break; 518 break; 550 } 519 } 551 // at this point we need to exit 520 // at this point we need to exit 552 if(nitr > 4 * (_ncore + 1)) 521 if(nitr > 4 * (_ncore + 1)) 553 { 522 { 554 _write_info(); 523 _write_info(); 555 break; 524 break; 556 } 525 } 557 } 526 } 558 if(get_verbose() > 3) 527 if(get_verbose() > 3) 559 _write_info(); 528 _write_info(); 560 #endif 529 #endif 561 } 530 } 562 else if(get_queue()) 531 else if(get_queue()) 563 { 532 { 564 get_queue()->ExecuteOnAllThreads(this, 533 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func)); 565 } 534 } 566 } 535 } 567 536 568 //-------------------------------------------- 537 //--------------------------------------------------------------------------------------// 569 538 570 template <typename FuncT> 539 template <typename FuncT> 571 inline void 540 inline void 572 ThreadPool::execute_on_specific_threads(const 541 ThreadPool::execute_on_specific_threads(const std::set<std::thread::id>& _tids, 573 FuncT& 542 FuncT&& _func) 574 { 543 { 575 if(m_tbb_tp && m_tbb_task_group) 544 if(m_tbb_tp && m_tbb_task_group) 576 { 545 { 577 #if defined(PTL_USE_TBB) 546 #if defined(PTL_USE_TBB) 578 // TBB lazily activates threads to pro 547 // TBB lazily activates threads to process tasks and the main thread 579 // participates in processing the task 548 // participates in processing the tasks so getting a specific 580 // function to execute only on the wor 549 // function to execute only on the worker threads requires some trickery 581 // 550 // 582 std::set<std::thread::id> _first{}; 551 std::set<std::thread::id> _first{}; 583 Mutex _mutex{}; 552 Mutex _mutex{}; 584 // init function which executes functi 553 // init function which executes function and returns 1 only once 585 auto _exec = [&]() { 554 auto _exec = [&]() { 586 int _once = 0; 555 int _once = 0; 587 _mutex.lock(); 556 _mutex.lock(); 588 if(_first.find(std::this_thread::g 557 if(_first.find(std::this_thread::get_id()) == _first.end()) 589 { 558 { 590 // we need to reset this threa 559 // we need to reset this thread-local static for multiple invocations 591 // of the same template instan 560 // of the same template instantiation 592 _once = 1; 561 _once = 1; 593 _first.insert(std::this_thread 562 _first.insert(std::this_thread::get_id()); 594 } 563 } 595 _mutex.unlock(); 564 _mutex.unlock(); 596 if(_once != 0) 565 if(_once != 0) 597 { 566 { 598 _func(); 567 _func(); 599 return 1; 568 return 1; 600 } 569 } 601 return 0; 570 return 0; 602 }; 571 }; 603 // this will collect the number of thr 572 // this will collect the number of threads which have 604 // executed the _exec function above 573 // executed the _exec function above 605 std::atomic<size_t> _total_exec{ 0 }; 574 std::atomic<size_t> _total_exec{ 0 }; 606 // number of cores 575 // number of cores 607 size_t _ncore = GetNumberOfCores(); << 576 size_t _ncore = Threading::GetNumberOfCores(); 608 // maximum depth for recursion 577 // maximum depth for recursion 609 size_t _dmax = std::max<size_t>(_ncore << 578 size_t _dmax = std::max<size_t>(_ncore, 4); 610 // how many threads we need to initial 579 // how many threads we need to initialize 611 size_t _num = _tids.size(); 580 size_t _num = _tids.size(); 612 // create a task arena 581 // create a task arena 613 auto* _arena = get_task_arena(); << 582 auto _arena = get_task_arena(); 614 // this is the task passed to the task 583 // this is the task passed to the task-group 615 std::function<void()> _exec_task; 584 std::function<void()> _exec_task; 616 _exec_task = [&]() { 585 _exec_task = [&]() { 617 add_thread_id(); 586 add_thread_id(); 618 static thread_local size_type _dep 587 static thread_local size_type _depth = 0; 619 int _ret 588 int _ret = 0; 620 auto _thi 589 auto _this_tid = std::this_thread::get_id(); 621 // don't let the main thread execu 590 // don't let the main thread execute the function 622 if(_tids.count(_this_tid) > 0) 591 if(_tids.count(_this_tid) > 0) 623 { 592 { 624 // execute the function 593 // execute the function 625 _ret = _exec(); 594 _ret = _exec(); 626 // add the result 595 // add the result 627 _total_exec += _ret; 596 _total_exec += _ret; 628 } 597 } 629 // if the function did not return 598 // if the function did not return anything, recursively execute 630 // two more tasks 599 // two more tasks 631 ++_depth; 600 ++_depth; 632 if(_ret == 0 && _depth < _dmax && 601 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num) 633 { 602 { 634 tbb::task_group tg{}; 603 tbb::task_group tg{}; 635 tg.run([&]() { _exec_task(); } 604 tg.run([&]() { _exec_task(); }); 636 tg.run([&]() { _exec_task(); } 605 tg.run([&]() { _exec_task(); }); 637 ThisThread::sleep_for(std::chr 606 ThisThread::sleep_for(std::chrono::milliseconds{ 1 }); 638 tg.wait(); 607 tg.wait(); 639 } 608 } 640 --_depth; 609 --_depth; 641 }; 610 }; 642 611 643 // TBB won't oversubscribe so we need 612 // TBB won't oversubscribe so we need to limit by ncores - 1 644 size_t nitr = 0; 613 size_t nitr = 0; 645 auto _fname = __FUNCTION__; 614 auto _fname = __FUNCTION__; 646 auto _write_info = [&]() { 615 auto _write_info = [&]() { 647 std::cout << "[" << _fname << "]> 616 std::cout << "[" << _fname << "]> Total executed: " << _total_exec 648 << ", expected: " << _nu 617 << ", expected: " << _num << ", size: " << size() << std::endl; 649 }; 618 }; 650 while(_total_exec < _num) 619 while(_total_exec < _num) 651 { 620 { 652 auto _n = 2 * _num; 621 auto _n = 2 * _num; 653 while(--_n > 0) 622 while(--_n > 0) 654 { 623 { 655 _arena->execute( 624 _arena->execute( 656 [&]() { m_tbb_task_group-> 625 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); }); 657 } 626 } 658 _arena->execute([&]() { m_tbb_task 627 _arena->execute([&]() { m_tbb_task_group->wait(); }); 659 // don't loop infinitely but use a 628 // don't loop infinitely but use a strict condition 660 if(nitr++ > 2 * (_num + 1) && (_to 629 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num) 661 { 630 { 662 _write_info(); 631 _write_info(); 663 break; 632 break; 664 } 633 } 665 // at this point we need to exit 634 // at this point we need to exit 666 if(nitr > 8 * (_num + 1)) 635 if(nitr > 8 * (_num + 1)) 667 { 636 { 668 _write_info(); 637 _write_info(); 669 break; 638 break; 670 } 639 } 671 } 640 } 672 if(get_verbose() > 3) 641 if(get_verbose() > 3) 673 _write_info(); 642 _write_info(); 674 #endif 643 #endif 675 } 644 } 676 else if(get_queue()) 645 else if(get_queue()) 677 { 646 { 678 get_queue()->ExecuteOnSpecificThreads( 647 get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func)); 679 } 648 } 680 } 649 } 681 650 682 //============================================ 651 //======================================================================================// 683 652 684 } // namespace PTL 653 } // namespace PTL 685 654