Geant4 Cross Reference |
1 // 1 // 2 // MIT License 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentati 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, includ 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distr 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit perso 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the followin 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permiss 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the S 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 18 // 19 // ------------------------------------------- 19 // --------------------------------------------------------------- 20 // Tasking class header file 20 // Tasking class header file 21 // 21 // 22 // Class Description: 22 // Class Description: 23 // 23 // 24 // This file creates a class for an efficient 24 // This file creates a class for an efficient thread-pool that 25 // accepts work in the form of tasks. 25 // accepts work in the form of tasks. 26 // 26 // 27 // ------------------------------------------- 27 // --------------------------------------------------------------- 28 // Author: Jonathan Madsen (Feb 13th 2018) 28 // Author: Jonathan Madsen (Feb 13th 2018) 29 // ------------------------------------------- 29 // --------------------------------------------------------------- 30 30 31 #pragma once 31 #pragma once 32 32 33 #include "PTL/AutoLock.hh" 33 #include "PTL/AutoLock.hh" 34 #ifndef G4GMAKE << 35 #include "" << 36 #endif << 37 #include "PTL/ThreadData.hh" 34 #include "PTL/ThreadData.hh" 38 #include "PTL/Threading.hh" 35 #include "PTL/Threading.hh" 39 #include "PTL/Types.hh" << 40 #include "PTL/VTask.hh" 36 #include "PTL/VTask.hh" >> 37 #include "PTL/VTaskGroup.hh" 41 #include "PTL/VUserTaskQueue.hh" 38 #include "PTL/VUserTaskQueue.hh" 42 39 43 #if defined(PTL_USE_TBB) << 40 #ifdef PTL_USE_TBB 44 # if !defined(TBB_SUPPRESS_DEPRECATED_MESSA << 45 # define TBB_SUPPRESS_DEPRECATED_MESSAG << 46 # endif << 47 # if !defined(TBB_PREVIEW_GLOBAL_CONTROL) << 48 # define TBB_PREVIEW_GLOBAL_CONTROL 1 << 49 # endif << 50 # include <tbb/global_control.h> 41 # include <tbb/global_control.h> 51 # include <tbb/task_arena.h> << 42 # include <tbb/tbb.h> 52 # include <tbb/task_group.h> << 53 #endif 43 #endif 54 44 55 #include <algorithm> << 45 // C 56 #include <atomic> << 57 #include <chrono> << 58 #include <cstdint> 46 #include <cstdint> 59 #include <cstdlib> 47 #include <cstdlib> >> 48 #include <cstring> >> 49 // C++ >> 50 #include <atomic> 60 #include <deque> 51 #include <deque> 61 #include <functional> << 62 #include <iostream> 52 #include <iostream> 63 #include <map> 53 #include <map> 64 #include <memory> 54 #include <memory> 65 #include <mutex> // IWYU pragma: keep << 55 #include <queue> 66 #include <set> 56 #include <set> 67 #include <thread> << 57 #include <stack> 68 #include <type_traits> // IWYU pragma: keep << 69 #include <unordered_map> 58 #include <unordered_map> 70 #include <utility> << 71 #include <vector> 59 #include <vector> 72 60 73 namespace PTL 61 namespace PTL 74 { 62 { 75 namespace thread_pool << 76 { << 77 namespace state << 78 { << 79 static const short STARTED = 0; << 80 static const short PARTIAL = 1; << 81 static const short STOPPED = 2; << 82 static const short NONINIT = 3; << 83 << 84 } // namespace state << 85 } // namespace thread_pool << 86 << 87 class ThreadPool 63 class ThreadPool 88 { 64 { 89 public: 65 public: 90 template <typename KeyT, typename MappedT, 66 template <typename KeyT, typename MappedT, typename HashT = KeyT> 91 using uomap = std::unordered_map<KeyT, Map 67 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>; 92 68 93 // pod-types 69 // pod-types 94 using size_type = size_t; 70 using size_type = size_t; 95 using task_count_type = std::shared_ptr<s 71 using task_count_type = std::shared_ptr<std::atomic_uintmax_t>; 96 using atomic_int_type = std::shared_ptr<s 72 using atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>; 97 using pool_state_type = std::shared_ptr<s 73 using pool_state_type = std::shared_ptr<std::atomic_short>; 98 using atomic_bool_type = std::shared_ptr<s 74 using atomic_bool_type = std::shared_ptr<std::atomic_bool>; 99 // objects 75 // objects 100 using task_type = VTask; 76 using task_type = VTask; 101 using lock_t = std::shared_ptr<Mutex 77 using lock_t = std::shared_ptr<Mutex>; 102 using condition_t = std::shared_ptr<Condi 78 using condition_t = std::shared_ptr<Condition>; 103 using task_pointer = std::shared_ptr<task_ << 79 using task_pointer = task_type*; 104 using task_queue_t = VUserTaskQueue; 80 using task_queue_t = VUserTaskQueue; 105 // containers 81 // containers 106 using thread_list_t = std::deque<Thre << 82 typedef std::deque<ThreadId> thread_list_t; 107 using bool_list_t = std::vector<boo << 83 typedef std::vector<bool> bool_list_t; 108 using thread_id_map_t = std::map<Thread << 84 typedef std::map<ThreadId, uintmax_t> thread_id_map_t; 109 using thread_index_map_t = std::map<uintma << 85 typedef std::map<uintmax_t, ThreadId> thread_index_map_t; 110 using thread_vec_t = std::vector<Thr << 86 using thread_vec_t = std::vector<Thread>; 111 using thread_data_t = std::vector<std << 112 // functions 87 // functions 113 using initialize_func_t = std::function<vo << 88 typedef std::function<void()> initialize_func_t; 114 using finalize_func_t = std::function<vo << 89 typedef std::function<intmax_t(intmax_t)> affinity_func_t; 115 using affinity_func_t = std::function<in << 116 90 117 static affinity_func_t& affinity_functor() << 91 public: 118 { << 92 // Constructor and Destructors 119 static affinity_func_t _v = [](intmax_ << 93 ThreadPool( >> 94 const size_type& pool_size, VUserTaskQueue* task_queue = nullptr, >> 95 bool _use_affinity = GetEnv<bool>("PTL_CPU_AFFINITY", false), >> 96 const affinity_func_t& = [](intmax_t) { 120 static std::atomic<intmax_t> assig 97 static std::atomic<intmax_t> assigned; 121 intmax_t _assi 98 intmax_t _assign = assigned++; 122 return _assign % Thread::hardware_ 99 return _assign % Thread::hardware_concurrency(); 123 }; << 100 }); 124 return _v; << 101 // Virtual destructors are required by abstract classes 125 } << 102 // so add it by default, just in case 126 << 103 virtual ~ThreadPool(); 127 static initialize_func_t& initialization_f << 128 { << 129 static initialize_func_t _v = []() {}; << 130 return _v; << 131 } << 132 << 133 static finalize_func_t& finalization_funct << 134 { << 135 static finalize_func_t _v = []() {}; << 136 return _v; << 137 } << 138 << 139 struct Config << 140 { << 141 bool init = true; << 142 bool use_tbb = false << 143 bool use_affinity = false << 144 int verbose = 0; << 145 int priority = 0; << 146 size_type pool_size = f_def << 147 VUserTaskQueue* task_queue = nullp << 148 affinity_func_t set_affinity = affin << 149 initialize_func_t initializer = initi << 150 finalize_func_t finalizer = final << 151 }; << 152 << 153 public: << 154 // Constructor and Destructors << 155 explicit ThreadPool(const Config&); << 156 ~ThreadPool(); << 157 ThreadPool(const ThreadPool&) = delete; 104 ThreadPool(const ThreadPool&) = delete; 158 ThreadPool(ThreadPool&&) = default; 105 ThreadPool(ThreadPool&&) = default; 159 ThreadPool& operator=(const ThreadPool&) = 106 ThreadPool& operator=(const ThreadPool&) = delete; 160 ThreadPool& operator=(ThreadPool&&) = defa 107 ThreadPool& operator=(ThreadPool&&) = default; 161 108 162 public: 109 public: 163 // Public functions 110 // Public functions 164 size_type initialize_threadpool(size_type) 111 size_type initialize_threadpool(size_type); // start the threads 165 size_type destroy_threadpool(); 112 size_type destroy_threadpool(); // destroy the threads 166 size_type stop_thread(); 113 size_type stop_thread(); 167 114 168 template <typename FuncT> 115 template <typename FuncT> 169 void execute_on_all_threads(FuncT&& _func) 116 void execute_on_all_threads(FuncT&& _func); 170 117 171 template <typename FuncT> << 172 void execute_on_specific_threads(const std << 173 FuncT&& << 174 << 175 task_queue_t* get_queue() const { return << 176 task_queue_t*& get_valid_queue(task_queue_ << 177 << 178 bool is_tbb_threadpool() const { return m_ << 179 << 180 public: 118 public: 181 /// set the default pool size << 119 // Public functions related to TBB 182 static void set_default_size(size_type _v) << 120 static bool using_tbb(); 183 << 121 // enable using TBB if available 184 /// get the default pool size << 122 static void set_use_tbb(bool val); 185 static size_type get_default_size() { retu << 186 123 187 public: 124 public: 188 // add tasks for threads to process 125 // add tasks for threads to process 189 size_type add_task(task_pointer&& task, in << 126 size_type add_task(task_pointer task, int bin = -1); 190 // size_type add_thread_task(ThreadId id, 127 // size_type add_thread_task(ThreadId id, task_pointer&& task); 191 // add a generic container with iterator 128 // add a generic container with iterator 192 template <typename ListT> 129 template <typename ListT> 193 size_type add_tasks(ListT&); 130 size_type add_tasks(ListT&); 194 131 195 Thread* get_thread(size_type _n) const; 132 Thread* get_thread(size_type _n) const; 196 Thread* get_thread(std::thread::id id) con 133 Thread* get_thread(std::thread::id id) const; 197 134 >> 135 task_queue_t* get_queue() const { return m_task_queue; } >> 136 198 // only relevant when compiled with PTL_US 137 // only relevant when compiled with PTL_USE_TBB 199 static tbb_global_control_t*& tbb_global_c 138 static tbb_global_control_t*& tbb_global_control(); 200 139 201 void set_initialization(initialize_func_t << 140 void set_initialization(initialize_func_t f) { m_init_func = f; } 202 void set_finalization(finalize_func_t f) { << 203 << 204 void reset_initialization() 141 void reset_initialization() 205 { 142 { 206 m_init_func = []() {}; << 143 auto f = []() {}; 207 } << 144 m_init_func = f; 208 void reset_finalization() << 209 { << 210 m_fini_func = []() {}; << 211 } 145 } 212 146 213 public: 147 public: 214 // get the pool state 148 // get the pool state 215 const pool_state_type& state() const { ret 149 const pool_state_type& state() const { return m_pool_state; } 216 // see how many main task threads there ar 150 // see how many main task threads there are 217 size_type size() const { return m_pool_siz 151 size_type size() const { return m_pool_size; } 218 // set the thread pool size 152 // set the thread pool size 219 void resize(size_type _n); 153 void resize(size_type _n); 220 // affinity assigns threads to cores, assi 154 // affinity assigns threads to cores, assignment at constructor 221 bool using_affinity() const { return m_use 155 bool using_affinity() const { return m_use_affinity; } 222 bool is_alive() { return m_alive_flag->loa 156 bool is_alive() { return m_alive_flag->load(); } 223 void notify(); 157 void notify(); 224 void notify_all(); 158 void notify_all(); 225 void notify(size_type); 159 void notify(size_type); 226 bool is_initialized() const; 160 bool is_initialized() const; 227 int get_active_threads_count() const { re << 161 int get_active_threads_count() const >> 162 { >> 163 return (m_thread_awake) ? m_thread_awake->load() : 0; >> 164 } 228 165 229 void set_affinity(affinity_func_t f) { m_a << 166 void set_affinity(affinity_func_t f) { m_affinity_func = f; } 230 void set_affinity(intmax_t i, Thread&) con << 167 void set_affinity(intmax_t i, Thread&); 231 void set_priority(int _prio, Thread&) cons << 232 168 233 void set_verbose(int n) { m_verbose = n; } 169 void set_verbose(int n) { m_verbose = n; } 234 int get_verbose() const { return m_verbos 170 int get_verbose() const { return m_verbose; } 235 bool is_main() const { return ThisThread:: << 171 bool is_master() const { return ThisThread::get_id() == m_master_tid; } 236 << 237 tbb_task_arena_t* get_task_arena(); << 238 172 239 public: 173 public: 240 // read FORCE_NUM_THREADS environment vari 174 // read FORCE_NUM_THREADS environment variable 241 static const thread_id_map_t& get_thread_i 175 static const thread_id_map_t& get_thread_ids(); 242 static uintmax_t get_thread_i << 243 static uintmax_t get_this_thr 176 static uintmax_t get_this_thread_id(); 244 static uintmax_t add_thread_i << 245 177 246 private: << 178 protected: 247 void execute_thread(VUserTaskQueue*); // 179 void execute_thread(VUserTaskQueue*); // function thread sits in 248 int insert(task_pointer&&, int = -1); << 180 int insert(const task_pointer&, int = -1); 249 int run_on_this(task_pointer&&); << 181 int run_on_this(task_pointer); 250 182 251 private: << 183 protected: 252 // called in THREAD INIT 184 // called in THREAD INIT 253 static void start_thread(ThreadPool*, thre << 185 static void start_thread(ThreadPool*, intmax_t = -1); >> 186 >> 187 void record_entry() >> 188 { >> 189 if(m_thread_active) >> 190 ++(*m_thread_active); >> 191 } 254 192 255 void record_entry(); << 193 void record_exit() 256 void record_exit(); << 194 { >> 195 if(m_thread_active) >> 196 --(*m_thread_active); >> 197 } 257 198 258 private: 199 private: 259 // Private variables 200 // Private variables 260 // random 201 // random 261 bool m_use_affinity = fal << 202 bool m_use_affinity; 262 bool m_tbb_tp = fal << 203 bool m_tbb_tp; 263 bool m_delete_task_queue = fal << 204 int m_verbose = 0; 264 int m_verbose = 0; << 205 size_type m_pool_size = 0; 265 int m_priority = 0; << 206 ThreadId m_master_tid; 266 size_type m_pool_size = 0; << 207 atomic_bool_type m_alive_flag = std::make_shared<std::atomic_bool>(false); 267 ThreadId m_main_tid = Thi << 208 pool_state_type m_pool_state = std::make_shared<std::atomic_short>(0); 268 atomic_bool_type m_alive_flag = std << 209 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>(); 269 pool_state_type m_pool_state = std << 210 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>(); 270 atomic_int_type m_thread_awake = std << 271 atomic_int_type m_thread_active = std << 272 211 273 // locks 212 // locks 274 lock_t m_task_lock = std::make_shared<Mute 213 lock_t m_task_lock = std::make_shared<Mutex>(); 275 // conditions 214 // conditions 276 condition_t m_task_cond = std::make_shared 215 condition_t m_task_cond = std::make_shared<Condition>(); 277 216 278 // containers 217 // containers 279 bool_list_t m_is_joined = {}; // joi << 218 bool_list_t m_is_joined; // join list 280 bool_list_t m_is_stopped = {}; // let << 219 bool_list_t m_is_stopped; // lets thread know to stop 281 thread_list_t m_main_threads = {}; // sto << 220 thread_list_t m_main_threads; // storage for active threads 282 thread_list_t m_stop_threads = {}; // sto << 221 thread_list_t m_stop_threads; // storage for stopped threads 283 thread_vec_t m_threads = {}; << 222 thread_vec_t m_threads; 284 thread_data_t m_thread_data = {}; << 285 223 286 // task queue 224 // task queue 287 task_queue_t* m_task_queue = nullp << 225 task_queue_t* m_task_queue; 288 tbb_task_arena_t* m_tbb_task_arena = nullp << 226 tbb_task_group_t* m_tbb_task_group; 289 tbb_task_group_t* m_tbb_task_group = nullp << 290 227 291 // functions 228 // functions 292 initialize_func_t m_init_func = initia << 229 initialize_func_t m_init_func; 293 finalize_func_t m_fini_func = finali << 230 affinity_func_t m_affinity_func; 294 affinity_func_t m_affinity_func = affini << 295 231 296 private: 232 private: 297 static size_type& f_default_pool_siz << 233 // Private static variables 298 static thread_id_map_t& f_thread_ids(); << 234 PTL_DLL static thread_id_map_t f_thread_ids; >> 235 PTL_DLL static bool f_use_tbb; 299 }; 236 }; 300 237 301 //-------------------------------------------- 238 //--------------------------------------------------------------------------------------// 302 inline void 239 inline void 303 ThreadPool::notify() 240 ThreadPool::notify() 304 { 241 { 305 // wake up one thread that is waiting for 242 // wake up one thread that is waiting for a task to be available 306 if(m_thread_awake->load() < m_pool_size) << 243 if(m_thread_awake && m_thread_awake->load() < m_pool_size) 307 { 244 { 308 AutoLock l(*m_task_lock); 245 AutoLock l(*m_task_lock); 309 m_task_cond->notify_one(); 246 m_task_cond->notify_one(); 310 } 247 } 311 } 248 } 312 //-------------------------------------------- 249 //--------------------------------------------------------------------------------------// 313 inline void 250 inline void 314 ThreadPool::notify_all() 251 ThreadPool::notify_all() 315 { 252 { 316 // wake all threads 253 // wake all threads 317 AutoLock l(*m_task_lock); 254 AutoLock l(*m_task_lock); 318 m_task_cond->notify_all(); 255 m_task_cond->notify_all(); 319 } 256 } 320 //-------------------------------------------- 257 //--------------------------------------------------------------------------------------// 321 inline void 258 inline void 322 ThreadPool::notify(size_type ntasks) 259 ThreadPool::notify(size_type ntasks) 323 { 260 { 324 if(ntasks == 0) 261 if(ntasks == 0) 325 return; 262 return; 326 263 327 // wake up as many threads that tasks just 264 // wake up as many threads that tasks just added 328 if(m_thread_awake->load() < m_pool_size) << 265 if(m_thread_awake && m_thread_awake->load() < m_pool_size) 329 { 266 { 330 AutoLock l(*m_task_lock); 267 AutoLock l(*m_task_lock); 331 if(ntasks < this->size()) 268 if(ntasks < this->size()) 332 { 269 { 333 for(size_type i = 0; i < ntasks; + 270 for(size_type i = 0; i < ntasks; ++i) 334 m_task_cond->notify_one(); 271 m_task_cond->notify_one(); 335 } 272 } 336 else 273 else 337 { << 338 m_task_cond->notify_all(); 274 m_task_cond->notify_all(); 339 } << 340 } 275 } 341 } 276 } 342 //-------------------------------------------- 277 //--------------------------------------------------------------------------------------// 343 // local function for getting the tbb task sch 278 // local function for getting the tbb task scheduler 344 inline tbb_global_control_t*& 279 inline tbb_global_control_t*& 345 ThreadPool::tbb_global_control() 280 ThreadPool::tbb_global_control() 346 { 281 { 347 static thread_local tbb_global_control_t* 282 static thread_local tbb_global_control_t* _instance = nullptr; 348 return _instance; 283 return _instance; 349 } 284 } 350 //-------------------------------------------- 285 //--------------------------------------------------------------------------------------// 351 // task arena << 352 inline tbb_task_arena_t* << 353 ThreadPool::get_task_arena() << 354 { << 355 #if defined(PTL_USE_TBB) << 356 // create a task arena << 357 if(!m_tbb_task_arena) << 358 { << 359 auto _sz = (tbb_global_control()) << 360 ? tbb_global_control()- << 361 tbb::global_contr << 362 : size(); << 363 m_tbb_task_arena = new tbb_task_arena_ << 364 m_tbb_task_arena->initialize(_sz, 1); << 365 } << 366 #else << 367 if(!m_tbb_task_arena) << 368 m_tbb_task_arena = new tbb_task_arena_ << 369 #endif << 370 return m_tbb_task_arena; << 371 } << 372 //-------------------------------------------- << 373 inline void 286 inline void 374 ThreadPool::resize(size_type _n) 287 ThreadPool::resize(size_type _n) 375 { 288 { >> 289 if(_n == m_pool_size) >> 290 return; 376 initialize_threadpool(_n); 291 initialize_threadpool(_n); 377 if(m_task_queue) << 292 m_task_queue->resize(static_cast<intmax_t>(_n)); 378 m_task_queue->resize(static_cast<intma << 379 } 293 } 380 //-------------------------------------------- 294 //--------------------------------------------------------------------------------------// 381 inline int 295 inline int 382 ThreadPool::run_on_this(task_pointer&& _task) << 296 ThreadPool::run_on_this(task_pointer task) 383 { 297 { 384 auto&& _func = [_task]() { (*_task)(); }; << 298 auto _func = [=]() { >> 299 (*task)(); >> 300 if(!task->group()) >> 301 delete task; >> 302 }; 385 303 386 if(m_tbb_tp && m_tbb_task_group) 304 if(m_tbb_tp && m_tbb_task_group) 387 { 305 { 388 auto* _arena = get_task_arena(); << 306 m_tbb_task_group->run(_func); 389 _arena->execute([this, _func]() { this << 390 } 307 } 391 else 308 else 392 { 309 { 393 _func(); 310 _func(); 394 } 311 } 395 // return the number of tasks added to tas 312 // return the number of tasks added to task-list 396 return 0; 313 return 0; 397 } 314 } 398 //-------------------------------------------- 315 //--------------------------------------------------------------------------------------// 399 inline int 316 inline int 400 ThreadPool::insert(task_pointer&& task, int bi << 317 ThreadPool::insert(const task_pointer& task, int bin) 401 { 318 { 402 static thread_local ThreadData* _data = Th 319 static thread_local ThreadData* _data = ThreadData::GetInstance(); 403 320 404 // pass the task to the queue 321 // pass the task to the queue 405 auto ibin = get_valid_queue(m_task_queue)- << 322 auto ibin = m_task_queue->InsertTask(task, _data, bin); 406 notify(); 323 notify(); 407 return (int)ibin; << 324 return ibin; 408 } 325 } 409 //-------------------------------------------- 326 //--------------------------------------------------------------------------------------// 410 inline ThreadPool::size_type 327 inline ThreadPool::size_type 411 ThreadPool::add_task(task_pointer&& task, int << 328 ThreadPool::add_task(task_pointer task, int bin) 412 { 329 { 413 // if not native (i.e. TBB) or we haven't << 330 // if not native (i.e. TBB) then return 414 if(m_tbb_tp || !task->is_native_task() || << 331 if(!task->is_native_task()) 415 return static_cast<size_type>(run_on_t << 332 return 0; 416 333 417 return static_cast<size_type>(insert(std:: << 334 // if we haven't built thread-pool, just execute >> 335 if(!m_alive_flag->load()) >> 336 return static_cast<size_type>(run_on_this(task)); >> 337 >> 338 return static_cast<size_type>(insert(task, bin)); 418 } 339 } 419 //-------------------------------------------- 340 //--------------------------------------------------------------------------------------// 420 template <typename ListT> 341 template <typename ListT> 421 inline ThreadPool::size_type 342 inline ThreadPool::size_type 422 ThreadPool::add_tasks(ListT& c) 343 ThreadPool::add_tasks(ListT& c) 423 { 344 { 424 if(!m_alive_flag) // if we haven't built 345 if(!m_alive_flag) // if we haven't built thread-pool, just execute 425 { 346 { 426 for(auto& itr : c) 347 for(auto& itr : c) 427 run(itr); 348 run(itr); 428 c.clear(); 349 c.clear(); 429 return 0; 350 return 0; 430 } 351 } 431 352 432 // TODO: put a limit on how many tasks can 353 // TODO: put a limit on how many tasks can be added at most 433 auto c_size = c.size(); 354 auto c_size = c.size(); 434 for(auto& itr : c) 355 for(auto& itr : c) 435 { 356 { 436 if(!itr->is_native_task()) 357 if(!itr->is_native_task()) 437 --c_size; 358 --c_size; 438 else 359 else 439 { 360 { 440 //++(m_task_queue); 361 //++(m_task_queue); 441 get_valid_queue(m_task_queue)->Ins << 362 m_task_queue->InsertTask(itr); 442 } 363 } 443 } 364 } 444 c.clear(); 365 c.clear(); 445 366 446 // notify sleeping threads 367 // notify sleeping threads 447 notify(c_size); 368 notify(c_size); 448 369 449 return c_size; 370 return c_size; 450 } 371 } 451 //-------------------------------------------- 372 //--------------------------------------------------------------------------------------// 452 template <typename FuncT> 373 template <typename FuncT> 453 inline void 374 inline void 454 ThreadPool::execute_on_all_threads(FuncT&& _fu 375 ThreadPool::execute_on_all_threads(FuncT&& _func) 455 { 376 { 456 if(m_tbb_tp && m_tbb_task_group) 377 if(m_tbb_tp && m_tbb_task_group) 457 { 378 { 458 #if defined(PTL_USE_TBB) 379 #if defined(PTL_USE_TBB) 459 // TBB lazily activates threads to pro << 380 // TBB lazily activates threads to process tasks and the master thread 460 // participates in processing the task 381 // participates in processing the tasks so getting a specific 461 // function to execute only on the wor 382 // function to execute only on the worker threads requires some trickery 462 // 383 // 463 std::set<std::thread::id> _first{}; << 384 auto master_tid = ThisThread::get_id(); 464 Mutex _mutex{}; << 385 std::set<std::thread::id> _first; >> 386 Mutex _mutex; 465 // init function which executes functi 387 // init function which executes function and returns 1 only once 466 auto _init = [&]() { 388 auto _init = [&]() { 467 int _once = 0; << 389 static thread_local int _once = 0; 468 _mutex.lock(); 390 _mutex.lock(); 469 if(_first.find(std::this_thread::g 391 if(_first.find(std::this_thread::get_id()) == _first.end()) 470 { 392 { 471 // we need to reset this threa 393 // we need to reset this thread-local static for multiple invocations 472 // of the same template instan 394 // of the same template instantiation 473 _once = 1; << 395 _once = 0; 474 _first.insert(std::this_thread 396 _first.insert(std::this_thread::get_id()); 475 } 397 } 476 _mutex.unlock(); 398 _mutex.unlock(); 477 if(_once != 0) << 399 if(_once++ == 0) 478 { 400 { 479 _func(); 401 _func(); 480 return 1; 402 return 1; 481 } 403 } 482 return 0; 404 return 0; 483 }; 405 }; >> 406 // consumes approximately N milliseconds of cpu time >> 407 auto _consume = [](long n) { >> 408 using stl_mutex_t = std::mutex; >> 409 using unique_lock_t = std::unique_lock<stl_mutex_t>; >> 410 // a mutex held by one lock >> 411 stl_mutex_t mutex; >> 412 // acquire lock >> 413 unique_lock_t hold_lk(mutex); >> 414 // associate but defer >> 415 unique_lock_t try_lk(mutex, std::defer_lock); >> 416 // get current time >> 417 auto now = std::chrono::steady_clock::now(); >> 418 // try until time point >> 419 while(std::chrono::steady_clock::now() < (now + std::chrono::milliseconds(n))) >> 420 try_lk.try_lock(); >> 421 }; 484 // this will collect the number of thr 422 // this will collect the number of threads which have 485 // executed the _init function above 423 // executed the _init function above 486 std::atomic<size_t> _total_init{ 0 }; 424 std::atomic<size_t> _total_init{ 0 }; 487 // max parallelism by TBB << 488 size_t _maxp = tbb_global_control()->a << 489 tbb::global_control::max_allowed_p << 490 // create a task arean << 491 auto* _arena = get_task_arena(); << 492 // size of the thread-pool << 493 size_t _sz = size(); << 494 // number of cores << 495 size_t _ncore = GetNumberOfCores(); << 496 // maximum depth for recursion << 497 size_t _dmax = std::max<size_t>(_ncore << 498 // how many threads we need to initial << 499 size_t _num = std::min(_maxp, std::min << 500 // this is the task passed to the task 425 // this is the task passed to the task-group 501 std::function<void()> _init_task; << 426 auto _init_task = [&]() { 502 _init_task = [&]() { << 427 int _ret = 0; 503 add_thread_id(); << 428 // don't let the master thread execute the function 504 static thread_local size_type _dep << 429 if(ThisThread::get_id() != master_tid) 505 int _ret << 506 // don't let the main thread execu << 507 if(!is_main()) << 508 { 430 { 509 // execute the function 431 // execute the function 510 _ret = _init(); 432 _ret = _init(); 511 // add the result 433 // add the result 512 _total_init += _ret; 434 _total_init += _ret; 513 } 435 } 514 // if the function did not return << 436 // if the function did not return anything, put it to sleep 515 // two more tasks << 437 // so TBB will wake other threads to execute the remaining tasks 516 ++_depth; << 438 if(_ret == 0) 517 if(_ret == 0 && _depth < _dmax && << 439 _consume(100); 518 { << 519 tbb::task_group tg{}; << 520 tg.run([&]() { _init_task(); } << 521 tg.run([&]() { _init_task(); } << 522 ThisThread::sleep_for(std::chr << 523 tg.wait(); << 524 } << 525 --_depth; << 526 }; 440 }; 527 441 528 // TBB won't oversubscribe so we need 442 // TBB won't oversubscribe so we need to limit by ncores - 1 529 size_t nitr = 0; << 443 size_t nitr = 0; >> 444 size_t _maxp = tbb_global_control()->active_value( >> 445 tbb::global_control::max_allowed_parallelism); >> 446 size_t _sz = size(); >> 447 size_t _ncore = Threading::GetNumberOfCores() - 1; >> 448 size_t _num = std::min(_maxp, std::min(_sz, _ncore)); 530 auto _fname = __FUNCTION__; 449 auto _fname = __FUNCTION__; 531 auto _write_info = [&]() { 450 auto _write_info = [&]() { 532 std::cout << "[" << _fname << "]> << 451 std::cerr << "[" << _fname << "]> Total initalized: " << _total_init 533 << ", expected: " << _nu 452 << ", expected: " << _num << ", max-parallel: " << _maxp 534 << ", size: " << _sz << 453 << ", size: " << _sz << ", ncore: " << _ncore << std::endl; 535 }; 454 }; 536 while(_total_init < _num) 455 while(_total_init < _num) 537 { 456 { 538 auto _n = 2 * _num; << 457 auto _n = _num; 539 while(--_n > 0) 458 while(--_n > 0) 540 { << 459 m_tbb_task_group->run(_init_task); 541 _arena->execute( << 460 m_tbb_task_group->wait(); 542 [&]() { m_tbb_task_group-> << 543 } << 544 _arena->execute([&]() { m_tbb_task << 545 // don't loop infinitely but use a 461 // don't loop infinitely but use a strict condition 546 if(nitr++ > 2 * (_num + 1) && (_to 462 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num) 547 { 463 { 548 _write_info(); 464 _write_info(); 549 break; 465 break; 550 } 466 } 551 // at this point we need to exit 467 // at this point we need to exit 552 if(nitr > 4 * (_ncore + 1)) 468 if(nitr > 4 * (_ncore + 1)) 553 { 469 { 554 _write_info(); 470 _write_info(); 555 break; 471 break; 556 } 472 } 557 } 473 } 558 if(get_verbose() > 3) 474 if(get_verbose() > 3) 559 _write_info(); 475 _write_info(); 560 #endif 476 #endif 561 } 477 } 562 else if(get_queue()) 478 else if(get_queue()) 563 { 479 { 564 get_queue()->ExecuteOnAllThreads(this, 480 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func)); 565 } 481 } 566 } 482 } 567 << 568 //-------------------------------------------- << 569 << 570 template <typename FuncT> << 571 inline void << 572 ThreadPool::execute_on_specific_threads(const << 573 FuncT& << 574 { << 575 if(m_tbb_tp && m_tbb_task_group) << 576 { << 577 #if defined(PTL_USE_TBB) << 578 // TBB lazily activates threads to pro << 579 // participates in processing the task << 580 // function to execute only on the wor << 581 // << 582 std::set<std::thread::id> _first{}; << 583 Mutex _mutex{}; << 584 // init function which executes functi << 585 auto _exec = [&]() { << 586 int _once = 0; << 587 _mutex.lock(); << 588 if(_first.find(std::this_thread::g << 589 { << 590 // we need to reset this threa << 591 // of the same template instan << 592 _once = 1; << 593 _first.insert(std::this_thread << 594 } << 595 _mutex.unlock(); << 596 if(_once != 0) << 597 { << 598 _func(); << 599 return 1; << 600 } << 601 return 0; << 602 }; << 603 // this will collect the number of thr << 604 // executed the _exec function above << 605 std::atomic<size_t> _total_exec{ 0 }; << 606 // number of cores << 607 size_t _ncore = GetNumberOfCores(); << 608 // maximum depth for recursion << 609 size_t _dmax = std::max<size_t>(_ncore << 610 // how many threads we need to initial << 611 size_t _num = _tids.size(); << 612 // create a task arena << 613 auto* _arena = get_task_arena(); << 614 // this is the task passed to the task << 615 std::function<void()> _exec_task; << 616 _exec_task = [&]() { << 617 add_thread_id(); << 618 static thread_local size_type _dep << 619 int _ret << 620 auto _thi << 621 // don't let the main thread execu << 622 if(_tids.count(_this_tid) > 0) << 623 { << 624 // execute the function << 625 _ret = _exec(); << 626 // add the result << 627 _total_exec += _ret; << 628 } << 629 // if the function did not return << 630 // two more tasks << 631 ++_depth; << 632 if(_ret == 0 && _depth < _dmax && << 633 { << 634 tbb::task_group tg{}; << 635 tg.run([&]() { _exec_task(); } << 636 tg.run([&]() { _exec_task(); } << 637 ThisThread::sleep_for(std::chr << 638 tg.wait(); << 639 } << 640 --_depth; << 641 }; << 642 << 643 // TBB won't oversubscribe so we need << 644 size_t nitr = 0; << 645 auto _fname = __FUNCTION__; << 646 auto _write_info = [&]() { << 647 std::cout << "[" << _fname << "]> << 648 << ", expected: " << _nu << 649 }; << 650 while(_total_exec < _num) << 651 { << 652 auto _n = 2 * _num; << 653 while(--_n > 0) << 654 { << 655 _arena->execute( << 656 [&]() { m_tbb_task_group-> << 657 } << 658 _arena->execute([&]() { m_tbb_task << 659 // don't loop infinitely but use a << 660 if(nitr++ > 2 * (_num + 1) && (_to << 661 { << 662 _write_info(); << 663 break; << 664 } << 665 // at this point we need to exit << 666 if(nitr > 8 * (_num + 1)) << 667 { << 668 _write_info(); << 669 break; << 670 } << 671 } << 672 if(get_verbose() > 3) << 673 _write_info(); << 674 #endif << 675 } << 676 else if(get_queue()) << 677 { << 678 get_queue()->ExecuteOnSpecificThreads( << 679 } << 680 } << 681 << 682 //============================================ 483 //======================================================================================// 683 484 684 } // namespace PTL 485 } // namespace PTL 685 486