Geant4 Cross Reference |
1 // 1 // 2 // MIT License 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentati 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, includ 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distr 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit perso 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the followin 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permiss 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the S 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 18 // 19 // 19 // 20 // ------------------------------------------- 20 // --------------------------------------------------------------- 21 // Tasking class header file 21 // Tasking class header file 22 // 22 // 23 // Class Description: 23 // Class Description: 24 // 24 // 25 // This file creates the a class for handling 25 // This file creates the a class for handling a group of tasks that 26 // can be independently joined 26 // can be independently joined 27 // 27 // 28 // ------------------------------------------- 28 // --------------------------------------------------------------- 29 // Author: Jonathan Madsen (Feb 13th 2018) 29 // Author: Jonathan Madsen (Feb 13th 2018) 30 // ------------------------------------------- 30 // --------------------------------------------------------------- 31 31 32 #pragma once 32 #pragma once 33 33 34 #include "PTL/AutoLock.hh" 34 #include "PTL/AutoLock.hh" 35 #ifndef G4GMAKE 35 #ifndef G4GMAKE 36 #include "" 36 #include "" 37 #endif 37 #endif >> 38 #include "PTL/Globals.hh" 38 #include "PTL/JoinFunction.hh" 39 #include "PTL/JoinFunction.hh" 39 #include "PTL/ScopeDestructor.hh" << 40 #include "PTL/Task.hh" 40 #include "PTL/Task.hh" 41 #include "PTL/ThreadData.hh" 41 #include "PTL/ThreadData.hh" 42 #include "PTL/ThreadPool.hh" 42 #include "PTL/ThreadPool.hh" 43 #include "PTL/Types.hh" << 43 #include "PTL/Threading.hh" >> 44 #include "PTL/Utility.hh" 44 #include "PTL/VTask.hh" 45 #include "PTL/VTask.hh" 45 #include "PTL/VUserTaskQueue.hh" 46 #include "PTL/VUserTaskQueue.hh" 46 #include "PTL/detail/CxxBackports.hh" << 47 47 48 #include <atomic> 48 #include <atomic> 49 #include <chrono> 49 #include <chrono> 50 #include <cstdint> 50 #include <cstdint> 51 #include <cstdio> 51 #include <cstdio> 52 #include <functional> 52 #include <functional> 53 #include <future> 53 #include <future> 54 #include <iostream> 54 #include <iostream> 55 #include <memory> 55 #include <memory> 56 #include <mutex> 56 #include <mutex> 57 #include <sstream> // IWYU pragma: keep << 58 #include <stdexcept> 57 #include <stdexcept> 59 #include <thread> 58 #include <thread> 60 #include <type_traits> 59 #include <type_traits> 61 #include <utility> 60 #include <utility> 62 #include <vector> 61 #include <vector> 63 62 64 #if defined(PTL_USE_TBB) 63 #if defined(PTL_USE_TBB) 65 # include <tbb/task_group.h> // IWYU pragm << 64 # include <tbb/task_group.h> 66 #endif 65 #endif 67 66 68 namespace PTL 67 namespace PTL 69 { 68 { 70 namespace internal 69 namespace internal 71 { 70 { 72 std::atomic_uintmax_t& 71 std::atomic_uintmax_t& 73 task_group_counter(); 72 task_group_counter(); 74 73 75 ThreadPool* 74 ThreadPool* 76 get_default_threadpool(); 75 get_default_threadpool(); 77 76 78 intmax_t 77 intmax_t 79 get_task_depth(); 78 get_task_depth(); 80 } // namespace internal 79 } // namespace internal 81 80 82 template <typename Tp, typename Arg = Tp, intm 81 template <typename Tp, typename Arg = Tp, intmax_t MaxDepth = 0> 83 class TaskGroup 82 class TaskGroup 84 { 83 { 85 public: 84 public: 86 //---------------------------------------- 85 //------------------------------------------------------------------------// 87 template <typename Up> 86 template <typename Up> 88 using container_type = std::vector<Up>; 87 using container_type = std::vector<Up>; 89 88 90 using tid_type = std::thread 89 using tid_type = std::thread::id; 91 using size_type = uintmax_t; 90 using size_type = uintmax_t; 92 using lock_t = Mutex; 91 using lock_t = Mutex; 93 using atomic_int = std::atomic 92 using atomic_int = std::atomic_intmax_t; 94 using atomic_uint = std::atomic 93 using atomic_uint = std::atomic_uintmax_t; 95 using condition_t = Condition; 94 using condition_t = Condition; 96 using ArgTp = decay_t<Arg 95 using ArgTp = decay_t<Arg>; 97 using result_type = Tp; 96 using result_type = Tp; 98 using task_pointer = std::shared 97 using task_pointer = std::shared_ptr<TaskFuture<ArgTp>>; 99 using task_list_t = container_t 98 using task_list_t = container_type<task_pointer>; 100 using this_type = TaskGroup<T 99 using this_type = TaskGroup<Tp, Arg, MaxDepth>; 101 using promise_type = std::promis 100 using promise_type = std::promise<ArgTp>; 102 using future_type = std::future 101 using future_type = std::future<ArgTp>; 103 using packaged_task_type = std::packag 102 using packaged_task_type = std::packaged_task<ArgTp()>; 104 using future_list_t = container_t 103 using future_list_t = container_type<future_type>; 105 using join_type = typename Jo 104 using join_type = typename JoinFunction<Tp, Arg>::Type; 106 using iterator = typename fu 105 using iterator = typename future_list_t::iterator; 107 using reverse_iterator = typename fu 106 using reverse_iterator = typename future_list_t::reverse_iterator; 108 using const_iterator = typename fu 107 using const_iterator = typename future_list_t::const_iterator; 109 using const_reverse_iterator = typename fu 108 using const_reverse_iterator = typename future_list_t::const_reverse_iterator; 110 //---------------------------------------- 109 //------------------------------------------------------------------------// 111 template <typename... Args> 110 template <typename... Args> 112 using task_type = Task<ArgTp, decay_t<Args 111 using task_type = Task<ArgTp, decay_t<Args>...>; 113 //---------------------------------------- 112 //------------------------------------------------------------------------// 114 113 115 public: 114 public: 116 // Constructor 115 // Constructor 117 template <typename Func> 116 template <typename Func> 118 TaskGroup(Func&& _join, ThreadPool* _tp = 117 TaskGroup(Func&& _join, ThreadPool* _tp = internal::get_default_threadpool()); 119 118 120 template <typename Up = Tp> 119 template <typename Up = Tp> 121 TaskGroup(ThreadPool* _tp = internal::get_ 120 TaskGroup(ThreadPool* _tp = internal::get_default_threadpool(), 122 enable_if_t<std::is_void<Up>::va 121 enable_if_t<std::is_void<Up>::value, int> = 0); 123 122 124 // Destructor 123 // Destructor 125 ~TaskGroup(); 124 ~TaskGroup(); 126 125 127 // delete copy-construct 126 // delete copy-construct 128 TaskGroup(const this_type&) = delete; 127 TaskGroup(const this_type&) = delete; 129 // define move-construct 128 // define move-construct 130 // NOLINTNEXTLINE(performance-noexcept-mov 129 // NOLINTNEXTLINE(performance-noexcept-move-constructor) 131 TaskGroup(this_type&& rhs) = default; 130 TaskGroup(this_type&& rhs) = default; 132 // delete copy-assign 131 // delete copy-assign 133 TaskGroup& operator=(const this_type& rhs) 132 TaskGroup& operator=(const this_type& rhs) = delete; 134 // define move-assign 133 // define move-assign 135 // NOLINTNEXTLINE(performance-noexcept-mov 134 // NOLINTNEXTLINE(performance-noexcept-move-constructor) 136 TaskGroup& operator=(this_type&& rhs) = de 135 TaskGroup& operator=(this_type&& rhs) = default; 137 136 138 public: 137 public: 139 template <typename Up> 138 template <typename Up> 140 std::shared_ptr<Up> operator+=(std::shared 139 std::shared_ptr<Up> operator+=(std::shared_ptr<Up>&& _task); 141 140 142 // wait to finish 141 // wait to finish 143 void wait(); 142 void wait(); 144 143 145 // increment (prefix) 144 // increment (prefix) 146 intmax_t operator++() { return ++(m_tot_ta 145 intmax_t operator++() { return ++(m_tot_task_count); } 147 intmax_t operator++(int) { return (m_tot_t 146 intmax_t operator++(int) { return (m_tot_task_count)++; } 148 intmax_t operator--() { return --(m_tot_ta 147 intmax_t operator--() { return --(m_tot_task_count); } 149 intmax_t operator--(int) { return (m_tot_t 148 intmax_t operator--(int) { return (m_tot_task_count)--; } 150 149 151 // size 150 // size 152 intmax_t size() const { return m_tot_task_ 151 intmax_t size() const { return m_tot_task_count.load(); } 153 152 154 // get the locks/conditions 153 // get the locks/conditions 155 lock_t& task_lock() { return m_task_l 154 lock_t& task_lock() { return m_task_lock; } 156 condition_t& task_cond() { return m_task_c 155 condition_t& task_cond() { return m_task_cond; } 157 156 158 // identifier 157 // identifier 159 uintmax_t id() const { return m_id; } 158 uintmax_t id() const { return m_id; } 160 159 161 // thread pool 160 // thread pool 162 void set_pool(ThreadPool* tp) { m_ 161 void set_pool(ThreadPool* tp) { m_pool = tp; } 163 ThreadPool*& pool() { return m_pool; } 162 ThreadPool*& pool() { return m_pool; } 164 ThreadPool* pool() const { return m_pool; 163 ThreadPool* pool() const { return m_pool; } 165 164 166 bool is_native_task_group() const { return 165 bool is_native_task_group() const { return (m_tbb_task_group) == nullptr; } 167 bool is_main() const { return this_tid() = 166 bool is_main() const { return this_tid() == m_main_tid; } 168 167 169 // check if any tasks are still pending 168 // check if any tasks are still pending 170 intmax_t pending() { return m_tot_task_cou 169 intmax_t pending() { return m_tot_task_count.load(); } 171 170 172 static void set_verbose(int level) { f_ver 171 static void set_verbose(int level) { f_verbose = level; } 173 172 174 ScopeDestructor get_scope_destructor(); 173 ScopeDestructor get_scope_destructor(); 175 174 176 void notify(); 175 void notify(); 177 void notify_all(); 176 void notify_all(); 178 177 179 void reserve(size_t _n) 178 void reserve(size_t _n) 180 { 179 { 181 m_task_list.reserve(_n); 180 m_task_list.reserve(_n); 182 m_future_list.reserve(_n); 181 m_future_list.reserve(_n); 183 } 182 } 184 183 185 public: 184 public: 186 template <typename Func, typename... Args> 185 template <typename Func, typename... Args> 187 std::shared_ptr<task_type<Args...>> wrap(F 186 std::shared_ptr<task_type<Args...>> wrap(Func func, Args... args) 188 { 187 { 189 return operator+=(std::make_shared<tas 188 return operator+=(std::make_shared<task_type<Args...>>( 190 is_native_task_group(), m_depth, s 189 is_native_task_group(), m_depth, std::move(func), std::move(args)...)); 191 } 190 } 192 191 193 template <typename Func, typename... Args, 192 template <typename Func, typename... Args, typename Up = Tp> 194 enable_if_t<std::is_void<Up>::value, void> 193 enable_if_t<std::is_void<Up>::value, void> exec(Func func, Args... args); 195 194 196 template <typename Func, typename... Args, 195 template <typename Func, typename... Args, typename Up = Tp> 197 enable_if_t<!std::is_void<Up>::value, void 196 enable_if_t<!std::is_void<Up>::value, void> exec(Func func, Args... args); 198 197 199 template <typename Func, typename... Args> 198 template <typename Func, typename... Args> 200 void run(Func func, Args... args) 199 void run(Func func, Args... args) 201 { 200 { 202 exec(std::move(func), std::move(args). 201 exec(std::move(func), std::move(args)...); 203 } 202 } 204 203 205 protected: 204 protected: 206 template <typename Up, typename Func, type 205 template <typename Up, typename Func, typename... Args> 207 enable_if_t<std::is_void<Up>::value, void> 206 enable_if_t<std::is_void<Up>::value, void> local_exec(Func func, Args... args); 208 207 209 template <typename Up, typename Func, type 208 template <typename Up, typename Func, typename... Args> 210 enable_if_t<!std::is_void<Up>::value, void 209 enable_if_t<!std::is_void<Up>::value, void> local_exec(Func func, Args... args); 211 210 212 // shorter typedefs 211 // shorter typedefs 213 using itr_t = iterator; 212 using itr_t = iterator; 214 using citr_t = const_iterator; 213 using citr_t = const_iterator; 215 using ritr_t = reverse_iterator; 214 using ritr_t = reverse_iterator; 216 using critr_t = const_reverse_iterator; 215 using critr_t = const_reverse_iterator; 217 216 218 public: 217 public: 219 //---------------------------------------- 218 //------------------------------------------------------------------------// 220 // Get tasks with non-void return types 219 // Get tasks with non-void return types 221 // 220 // 222 future_list_t& get_tasks() { return 221 future_list_t& get_tasks() { return m_future_list; } 223 const future_list_t& get_tasks() const { r 222 const future_list_t& get_tasks() const { return m_future_list; } 224 223 225 //---------------------------------------- 224 //------------------------------------------------------------------------// 226 // iterate over tasks with return type 225 // iterate over tasks with return type 227 // 226 // 228 itr_t begin() { return m_future_list.beg 227 itr_t begin() { return m_future_list.begin(); } 229 itr_t end() { return m_future_list.end() 228 itr_t end() { return m_future_list.end(); } 230 citr_t begin() const { return m_future_li 229 citr_t begin() const { return m_future_list.begin(); } 231 citr_t end() const { return m_future_list 230 citr_t end() const { return m_future_list.end(); } 232 citr_t cbegin() const { return m_future_l 231 citr_t cbegin() const { return m_future_list.begin(); } 233 citr_t cend() const { return m_future_lis 232 citr_t cend() const { return m_future_list.end(); } 234 ritr_t rbegin() { return m_future_list.rb 233 ritr_t rbegin() { return m_future_list.rbegin(); } 235 ritr_t rend() { return m_future_list.rend 234 ritr_t rend() { return m_future_list.rend(); } 236 critr_t rbegin() const { return m_future_l 235 critr_t rbegin() const { return m_future_list.rbegin(); } 237 critr_t rend() const { return m_future_lis 236 critr_t rend() const { return m_future_list.rend(); } 238 237 239 //---------------------------------------- 238 //------------------------------------------------------------------------// 240 // wait to finish 239 // wait to finish 241 template <typename Up = Tp, enable_if_t<!s 240 template <typename Up = Tp, enable_if_t<!std::is_void<Up>::value, int> = 0> 242 inline Up join(Up accum = {}); 241 inline Up join(Up accum = {}); 243 //---------------------------------------- 242 //------------------------------------------------------------------------// 244 // wait to finish 243 // wait to finish 245 template <typename Up = Tp, typename Rp = 244 template <typename Up = Tp, typename Rp = Arg, 246 enable_if_t<std::is_void<Up>::va 245 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int> = 0> 247 inline void join(); 246 inline void join(); 248 //---------------------------------------- 247 //------------------------------------------------------------------------// 249 // wait to finish 248 // wait to finish 250 template <typename Up = Tp, typename Rp = 249 template <typename Up = Tp, typename Rp = Arg, 251 enable_if_t<std::is_void<Up>::va 250 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int> = 0> 252 inline void join(); 251 inline void join(); 253 //---------------------------------------- 252 //------------------------------------------------------------------------// 254 // clear the task result history 253 // clear the task result history 255 void clear(); 254 void clear(); 256 255 257 protected: 256 protected: 258 //---------------------------------------- 257 //------------------------------------------------------------------------// 259 // get the thread id 258 // get the thread id 260 static tid_type this_tid() { return std::t 259 static tid_type this_tid() { return std::this_thread::get_id(); } 261 260 262 //---------------------------------------- 261 //------------------------------------------------------------------------// 263 // get the task count 262 // get the task count 264 atomic_int& task_count() { return m_ 263 atomic_int& task_count() { return m_tot_task_count; } 265 const atomic_int& task_count() const { ret 264 const atomic_int& task_count() const { return m_tot_task_count; } 266 265 267 protected: 266 protected: 268 static int f_verbose; 267 static int f_verbose; 269 // Private variables 268 // Private variables 270 uintmax_t m_id = internal::t 269 uintmax_t m_id = internal::task_group_counter()++; 271 intmax_t m_depth = internal::g 270 intmax_t m_depth = internal::get_task_depth(); 272 tid_type m_main_tid = std::this_t 271 tid_type m_main_tid = std::this_thread::get_id(); 273 atomic_int m_tot_task_count{ 0 }; 272 atomic_int m_tot_task_count{ 0 }; 274 lock_t m_task_lock = {}; 273 lock_t m_task_lock = {}; 275 condition_t m_task_cond = {}; 274 condition_t m_task_cond = {}; 276 join_type m_join{}; 275 join_type m_join{}; 277 ThreadPool* m_pool = inter 276 ThreadPool* m_pool = internal::get_default_threadpool(); 278 tbb_task_group_t* m_tbb_task_group = nullp 277 tbb_task_group_t* m_tbb_task_group = nullptr; 279 task_list_t m_task_list = {}; 278 task_list_t m_task_list = {}; 280 future_list_t m_future_list = {}; 279 future_list_t m_future_list = {}; 281 280 282 private: 281 private: 283 void internal_update(); 282 void internal_update(); 284 }; 283 }; 285 284 286 } // namespace PTL 285 } // namespace PTL 287 namespace PTL 286 namespace PTL 288 { 287 { 289 template <typename Tp, typename Arg, intmax_t 288 template <typename Tp, typename Arg, intmax_t MaxDepth> 290 template <typename Func> 289 template <typename Func> 291 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Func&& 290 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Func&& _join, ThreadPool* _tp) 292 : m_join{ std::forward<Func>(_join) } 291 : m_join{ std::forward<Func>(_join) } 293 , m_pool{ _tp } 292 , m_pool{ _tp } 294 { 293 { 295 internal_update(); 294 internal_update(); 296 } 295 } 297 296 298 template <typename Tp, typename Arg, intmax_t 297 template <typename Tp, typename Arg, intmax_t MaxDepth> 299 template <typename Up> 298 template <typename Up> 300 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Thread 299 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(ThreadPool* _tp, 301 enable 300 enable_if_t<std::is_void<Up>::value, int>) 302 : m_join{ []() {} } 301 : m_join{ []() {} } 303 , m_pool{ _tp } 302 , m_pool{ _tp } 304 { 303 { 305 internal_update(); 304 internal_update(); 306 } 305 } 307 306 308 // Destructor 307 // Destructor 309 template <typename Tp, typename Arg, intmax_t 308 template <typename Tp, typename Arg, intmax_t MaxDepth> 310 TaskGroup<Tp, Arg, MaxDepth>::~TaskGroup() 309 TaskGroup<Tp, Arg, MaxDepth>::~TaskGroup() 311 { 310 { 312 { 311 { 313 // task will decrement counter and the 312 // task will decrement counter and then acquire the lock to notify 314 // condition variable so acquiring loc 313 // condition variable so acquiring lock here will prevent the 315 // task group from being destroyed bef 314 // task group from being destroyed before this is completed 316 AutoLock _lk{ m_task_lock, std::defer_ 315 AutoLock _lk{ m_task_lock, std::defer_lock }; 317 if(!_lk.owns_lock()) 316 if(!_lk.owns_lock()) 318 _lk.lock(); 317 _lk.lock(); 319 } 318 } 320 319 321 if(m_tbb_task_group) 320 if(m_tbb_task_group) 322 { 321 { 323 auto* _arena = m_pool->get_task_arena( 322 auto* _arena = m_pool->get_task_arena(); 324 _arena->execute([this]() { this->m_tbb 323 _arena->execute([this]() { this->m_tbb_task_group->wait(); }); 325 } 324 } 326 delete m_tbb_task_group; 325 delete m_tbb_task_group; 327 this->clear(); 326 this->clear(); 328 } 327 } 329 328 330 template <typename Tp, typename Arg, intmax_t 329 template <typename Tp, typename Arg, intmax_t MaxDepth> 331 template <typename Up> 330 template <typename Up> 332 std::shared_ptr<Up> 331 std::shared_ptr<Up> 333 TaskGroup<Tp, Arg, MaxDepth>::operator+=(std:: 332 TaskGroup<Tp, Arg, MaxDepth>::operator+=(std::shared_ptr<Up>&& _task) 334 { 333 { 335 // thread-safe increment of tasks in task 334 // thread-safe increment of tasks in task group 336 operator++(); 335 operator++(); 337 // copy the shared pointer to abstract ins 336 // copy the shared pointer to abstract instance 338 m_task_list.push_back(_task); 337 m_task_list.push_back(_task); 339 // return the derived instance 338 // return the derived instance 340 return std::move(_task); 339 return std::move(_task); 341 } 340 } 342 341 343 template <typename Tp, typename Arg, intmax_t 342 template <typename Tp, typename Arg, intmax_t MaxDepth> 344 void 343 void 345 TaskGroup<Tp, Arg, MaxDepth>::wait() 344 TaskGroup<Tp, Arg, MaxDepth>::wait() 346 { 345 { 347 auto _dtor = ScopeDestructor{ [&]() { 346 auto _dtor = ScopeDestructor{ [&]() { 348 if(m_tbb_task_group) 347 if(m_tbb_task_group) 349 { 348 { 350 auto* _arena = m_pool->get_task_ar 349 auto* _arena = m_pool->get_task_arena(); 351 _arena->execute([this]() { this->m 350 _arena->execute([this]() { this->m_tbb_task_group->wait(); }); 352 } 351 } 353 } }; 352 } }; 354 353 355 ThreadData* data = ThreadData::GetInstance 354 ThreadData* data = ThreadData::GetInstance(); 356 if(!data) 355 if(!data) 357 return; 356 return; 358 357 359 // if no pool was initially present at cre 358 // if no pool was initially present at creation 360 if(!m_pool) 359 if(!m_pool) 361 { 360 { 362 // check for master MT run-manager 361 // check for master MT run-manager 363 m_pool = internal::get_default_threadp 362 m_pool = internal::get_default_threadpool(); 364 363 365 // if no thread pool created 364 // if no thread pool created 366 if(!m_pool) 365 if(!m_pool) 367 { 366 { 368 if(f_verbose > 0) 367 if(f_verbose > 0) 369 { 368 { 370 fprintf(stderr, "%s @ %i :: Wa 369 fprintf(stderr, "%s @ %i :: Warning! nullptr to thread-pool (%p)\n", 371 __FUNCTION__, __LINE__ 370 __FUNCTION__, __LINE__, static_cast<void*>(m_pool)); 372 std::cerr << __FUNCTION__ << " 371 std::cerr << __FUNCTION__ << "@" << __LINE__ << " :: Warning! " 373 << "nullptr to threa 372 << "nullptr to thread pool!" << std::endl; 374 } 373 } 375 return; 374 return; 376 } 375 } 377 } 376 } 378 377 379 ThreadPool* tpool = (m_pool) ? m_pool 378 ThreadPool* tpool = (m_pool) ? m_pool : data->thread_pool; 380 VUserTaskQueue* taskq = (tpool) ? tpool->g 379 VUserTaskQueue* taskq = (tpool) ? tpool->get_queue() : data->current_queue; 381 380 382 bool _is_main = data->is_main; 381 bool _is_main = data->is_main; 383 bool _within_task = data->within_task; 382 bool _within_task = data->within_task; 384 383 385 auto is_active_state = [&]() { 384 auto is_active_state = [&]() { 386 return (tpool->state()->load(std::memo 385 return (tpool->state()->load(std::memory_order_relaxed) != 387 thread_pool::state::STOPPED); 386 thread_pool::state::STOPPED); 388 }; 387 }; 389 388 390 auto execute_this_threads_tasks = [&]() { 389 auto execute_this_threads_tasks = [&]() { 391 if(!taskq) 390 if(!taskq) 392 return; 391 return; 393 392 394 // only want to process if within a ta 393 // only want to process if within a task 395 if((!_is_main || tpool->size() < 2) && 394 if((!_is_main || tpool->size() < 2) && _within_task) 396 { 395 { 397 int bin = static_cast<int>(taskq-> 396 int bin = static_cast<int>(taskq->GetThreadBin()); 398 // const auto nitr = (tpool) ? tpo 397 // const auto nitr = (tpool) ? tpool->size() : 399 // Thread::hardware_concurrency(); 398 // Thread::hardware_concurrency(); 400 while(this->pending() > 0) 399 while(this->pending() > 0) 401 { 400 { 402 if(!taskq->empty()) 401 if(!taskq->empty()) 403 { 402 { 404 auto _task = taskq->GetTas 403 auto _task = taskq->GetTask(bin); 405 if(_task) 404 if(_task) 406 (*_task)(); 405 (*_task)(); 407 } 406 } 408 } 407 } 409 } 408 } 410 }; 409 }; 411 410 412 // checks for validity 411 // checks for validity 413 if(!is_native_task_group()) 412 if(!is_native_task_group()) 414 { 413 { 415 // for external threads 414 // for external threads 416 if(!_is_main || tpool->size() < 2) 415 if(!_is_main || tpool->size() < 2) 417 return; 416 return; 418 } 417 } 419 else if(f_verbose > 0) 418 else if(f_verbose > 0) 420 { 419 { 421 if(!tpool || !taskq) 420 if(!tpool || !taskq) 422 { 421 { 423 // something is wrong, didn't crea 422 // something is wrong, didn't create thread-pool? 424 fprintf(stderr, 423 fprintf(stderr, 425 "%s @ %i :: Warning! nullp 424 "%s @ %i :: Warning! nullptr to thread data (%p) or task-queue " 426 "(%p)\n", 425 "(%p)\n", 427 __FUNCTION__, __LINE__, st 426 __FUNCTION__, __LINE__, static_cast<void*>(tpool), 428 static_cast<void*>(taskq)) 427 static_cast<void*>(taskq)); 429 } 428 } 430 // return if thread pool isn't built 429 // return if thread pool isn't built 431 else if(is_native_task_group() && !tpo 430 else if(is_native_task_group() && !tpool->is_alive()) 432 { 431 { 433 fprintf(stderr, "%s @ %i :: Warnin 432 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not alive!\n", 434 __FUNCTION__, __LINE__); 433 __FUNCTION__, __LINE__); 435 } 434 } 436 else if(!is_active_state()) 435 else if(!is_active_state()) 437 { 436 { 438 fprintf(stderr, "%s @ %i :: Warnin 437 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not active!\n", 439 __FUNCTION__, __LINE__); 438 __FUNCTION__, __LINE__); 440 } 439 } 441 } 440 } 442 441 443 intmax_t wake_size = 2; 442 intmax_t wake_size = 2; 444 AutoLock _lock(m_task_lock, std::defer_loc 443 AutoLock _lock(m_task_lock, std::defer_lock); 445 444 446 while(is_active_state()) 445 while(is_active_state()) 447 { 446 { 448 execute_this_threads_tasks(); 447 execute_this_threads_tasks(); 449 448 450 // while loop protects against spuriou 449 // while loop protects against spurious wake-ups 451 while(_is_main && pending() > 0 && is_ 450 while(_is_main && pending() > 0 && is_active_state()) 452 { 451 { 453 // auto _wake = [&]() { return (wa 452 // auto _wake = [&]() { return (wake_size > pending() || 454 // !is_active_state()); 453 // !is_active_state()); 455 // }; 454 // }; 456 455 457 // lock before sleeping on conditi 456 // lock before sleeping on condition 458 if(!_lock.owns_lock()) 457 if(!_lock.owns_lock()) 459 _lock.lock(); 458 _lock.lock(); 460 459 461 // Wait until signaled that a task 460 // Wait until signaled that a task has been competed 462 // Unlock mutex while wait, then l 461 // Unlock mutex while wait, then lock it back when signaled 463 // when true, this wakes the threa 462 // when true, this wakes the thread 464 if(pending() >= wake_size) 463 if(pending() >= wake_size) 465 { 464 { 466 m_task_cond.wait(_lock); 465 m_task_cond.wait(_lock); 467 } 466 } 468 else 467 else 469 { 468 { 470 m_task_cond.wait_for(_lock, st 469 m_task_cond.wait_for(_lock, std::chrono::microseconds(100)); 471 } 470 } 472 // unlock 471 // unlock 473 if(_lock.owns_lock()) 472 if(_lock.owns_lock()) 474 _lock.unlock(); 473 _lock.unlock(); 475 } 474 } 476 475 477 // if pending is not greater than zero 476 // if pending is not greater than zero, we are joined 478 if(pending() <= 0) 477 if(pending() <= 0) 479 break; 478 break; 480 } 479 } 481 480 482 if(_lock.owns_lock()) 481 if(_lock.owns_lock()) 483 _lock.unlock(); 482 _lock.unlock(); 484 483 485 intmax_t ntask = this->task_count().load() 484 intmax_t ntask = this->task_count().load(); 486 if(ntask > 0) 485 if(ntask > 0) 487 { 486 { 488 std::stringstream ss; 487 std::stringstream ss; 489 ss << "\nWarning! Join operation issue 488 ss << "\nWarning! Join operation issue! " << ntask << " tasks still " 490 << "are running!" << std::endl; 489 << "are running!" << std::endl; 491 std::cerr << ss.str(); 490 std::cerr << ss.str(); 492 this->wait(); 491 this->wait(); 493 } 492 } 494 } 493 } 495 494 496 template <typename Tp, typename Arg, intmax_t 495 template <typename Tp, typename Arg, intmax_t MaxDepth> 497 ScopeDestructor 496 ScopeDestructor 498 TaskGroup<Tp, Arg, MaxDepth>::get_scope_destru 497 TaskGroup<Tp, Arg, MaxDepth>::get_scope_destructor() 499 { 498 { 500 auto& _counter = m_tot_task_count; 499 auto& _counter = m_tot_task_count; 501 auto& _task_cond = task_cond(); 500 auto& _task_cond = task_cond(); 502 auto& _task_lock = task_lock(); 501 auto& _task_lock = task_lock(); 503 return ScopeDestructor{ [&_task_cond, &_ta 502 return ScopeDestructor{ [&_task_cond, &_task_lock, &_counter]() { 504 auto _count = --(_counter); 503 auto _count = --(_counter); 505 if(_count < 1) 504 if(_count < 1) 506 { 505 { 507 AutoLock _lk{ _task_lock }; 506 AutoLock _lk{ _task_lock }; 508 _task_cond.notify_all(); 507 _task_cond.notify_all(); 509 } 508 } 510 } }; 509 } }; 511 } 510 } 512 511 513 template <typename Tp, typename Arg, intmax_t 512 template <typename Tp, typename Arg, intmax_t MaxDepth> 514 void 513 void 515 TaskGroup<Tp, Arg, MaxDepth>::notify() 514 TaskGroup<Tp, Arg, MaxDepth>::notify() 516 { 515 { 517 AutoLock _lk{ m_task_lock }; 516 AutoLock _lk{ m_task_lock }; 518 m_task_cond.notify_one(); 517 m_task_cond.notify_one(); 519 } 518 } 520 519 521 template <typename Tp, typename Arg, intmax_t 520 template <typename Tp, typename Arg, intmax_t MaxDepth> 522 void 521 void 523 TaskGroup<Tp, Arg, MaxDepth>::notify_all() 522 TaskGroup<Tp, Arg, MaxDepth>::notify_all() 524 { 523 { 525 AutoLock _lk{ m_task_lock }; 524 AutoLock _lk{ m_task_lock }; 526 m_task_cond.notify_all(); 525 m_task_cond.notify_all(); 527 } 526 } 528 527 529 template <typename Tp, typename Arg, intmax_t 528 template <typename Tp, typename Arg, intmax_t MaxDepth> 530 template <typename Func, typename... Args, typ 529 template <typename Func, typename... Args, typename Up> 531 enable_if_t<std::is_void<Up>::value, void> 530 enable_if_t<std::is_void<Up>::value, void> 532 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, 531 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args) 533 { 532 { 534 if(MaxDepth > 0 && !m_tbb_task_group && Th 533 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() && 535 ThreadData::GetInstance()->task_depth > 534 ThreadData::GetInstance()->task_depth > MaxDepth) 536 { 535 { 537 local_exec<Tp>(std::move(func), std::m 536 local_exec<Tp>(std::move(func), std::move(args)...); 538 } 537 } 539 else 538 else 540 { 539 { 541 auto& _counter = m_tot_task_count; 540 auto& _counter = m_tot_task_count; 542 auto& _task_cond = task_cond(); 541 auto& _task_cond = task_cond(); 543 auto& _task_lock = task_lock(); 542 auto& _task_lock = task_lock(); 544 auto _task = wrap([&_task_cond, 543 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() { 545 auto* _tdata = ThreadData::GetInst 544 auto* _tdata = ThreadData::GetInstance(); 546 if(_tdata) 545 if(_tdata) 547 ++(_tdata->task_depth); 546 ++(_tdata->task_depth); 548 func(args...); 547 func(args...); 549 auto _count = --(_counter); 548 auto _count = --(_counter); 550 if(_tdata) 549 if(_tdata) 551 --(_tdata->task_depth); 550 --(_tdata->task_depth); 552 if(_count < 1) 551 if(_count < 1) 553 { 552 { 554 AutoLock _lk{ _task_lock }; 553 AutoLock _lk{ _task_lock }; 555 _task_cond.notify_all(); 554 _task_cond.notify_all(); 556 } 555 } 557 }); 556 }); 558 557 559 if(m_tbb_task_group) 558 if(m_tbb_task_group) 560 { 559 { 561 auto* _arena = m_pool->ge 560 auto* _arena = m_pool->get_task_arena(); 562 auto* _tbb_task_group = m_tbb_task 561 auto* _tbb_task_group = m_tbb_task_group; 563 auto* _ptask = _task.get( 562 auto* _ptask = _task.get(); 564 _arena->execute([_tbb_task_group, 563 _arena->execute([_tbb_task_group, _ptask]() { 565 _tbb_task_group->run([_ptask]( 564 _tbb_task_group->run([_ptask]() { (*_ptask)(); }); 566 }); 565 }); 567 } 566 } 568 else 567 else 569 { 568 { 570 m_pool->add_task(std::move(_task)) 569 m_pool->add_task(std::move(_task)); 571 } 570 } 572 } 571 } 573 } 572 } 574 template <typename Tp, typename Arg, intmax_t 573 template <typename Tp, typename Arg, intmax_t MaxDepth> 575 template <typename Func, typename... Args, typ 574 template <typename Func, typename... Args, typename Up> 576 enable_if_t<!std::is_void<Up>::value, void> 575 enable_if_t<!std::is_void<Up>::value, void> 577 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, 576 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args) 578 { 577 { 579 if(MaxDepth > 0 && !m_tbb_task_group && Th 578 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() && 580 ThreadData::GetInstance()->task_depth > 579 ThreadData::GetInstance()->task_depth > MaxDepth) 581 { 580 { 582 local_exec<Tp>(std::move(func), std::m 581 local_exec<Tp>(std::move(func), std::move(args)...); 583 } 582 } 584 else 583 else 585 { 584 { 586 auto& _counter = m_tot_task_count; 585 auto& _counter = m_tot_task_count; 587 auto& _task_cond = task_cond(); 586 auto& _task_cond = task_cond(); 588 auto& _task_lock = task_lock(); 587 auto& _task_lock = task_lock(); 589 auto _task = wrap([&_task_cond, 588 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() { 590 auto* _tdata = ThreadData::GetInst 589 auto* _tdata = ThreadData::GetInstance(); 591 if(_tdata) 590 if(_tdata) 592 ++(_tdata->task_depth); 591 ++(_tdata->task_depth); 593 auto&& _ret = func(args...); 592 auto&& _ret = func(args...); 594 auto _count = --(_counter); 593 auto _count = --(_counter); 595 if(_tdata) 594 if(_tdata) 596 --(_tdata->task_depth); 595 --(_tdata->task_depth); 597 if(_count < 1) 596 if(_count < 1) 598 { 597 { 599 AutoLock _lk{ _task_lock }; 598 AutoLock _lk{ _task_lock }; 600 _task_cond.notify_all(); 599 _task_cond.notify_all(); 601 } 600 } 602 return std::forward<decltype(_ret) 601 return std::forward<decltype(_ret)>(_ret); 603 }); 602 }); 604 603 605 if(m_tbb_task_group) 604 if(m_tbb_task_group) 606 { 605 { 607 auto* _arena = m_pool->ge 606 auto* _arena = m_pool->get_task_arena(); 608 auto* _tbb_task_group = m_tbb_task 607 auto* _tbb_task_group = m_tbb_task_group; 609 auto* _ptask = _task.get( 608 auto* _ptask = _task.get(); 610 _arena->execute([_tbb_task_group, 609 _arena->execute([_tbb_task_group, _ptask]() { 611 _tbb_task_group->run([_ptask]( 610 _tbb_task_group->run([_ptask]() { (*_ptask)(); }); 612 }); 611 }); 613 } 612 } 614 else 613 else 615 { 614 { 616 m_pool->add_task(std::move(_task)) 615 m_pool->add_task(std::move(_task)); 617 } 616 } 618 } 617 } 619 } 618 } 620 619 621 template <typename Tp, typename Arg, intmax_t 620 template <typename Tp, typename Arg, intmax_t MaxDepth> 622 template <typename Up, typename Func, typename 621 template <typename Up, typename Func, typename... Args> 623 enable_if_t<std::is_void<Up>::value, void> 622 enable_if_t<std::is_void<Up>::value, void> 624 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func 623 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args) 625 { 624 { 626 auto* _tdata = ThreadData::GetInstance(); 625 auto* _tdata = ThreadData::GetInstance(); 627 if(_tdata) 626 if(_tdata) 628 ++(_tdata->task_depth); 627 ++(_tdata->task_depth); 629 promise_type _p{}; 628 promise_type _p{}; 630 m_future_list.emplace_back(_p.get_future() 629 m_future_list.emplace_back(_p.get_future()); 631 func(args...); 630 func(args...); 632 _p.set_value(); 631 _p.set_value(); 633 if(_tdata) 632 if(_tdata) 634 --(_tdata->task_depth); 633 --(_tdata->task_depth); 635 } 634 } 636 635 637 template <typename Tp, typename Arg, intmax_t 636 template <typename Tp, typename Arg, intmax_t MaxDepth> 638 template <typename Up, typename Func, typename 637 template <typename Up, typename Func, typename... Args> 639 enable_if_t<!std::is_void<Up>::value, void> 638 enable_if_t<!std::is_void<Up>::value, void> 640 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func 639 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args) 641 { 640 { 642 auto* _tdata = ThreadData::GetInstance(); 641 auto* _tdata = ThreadData::GetInstance(); 643 if(_tdata) 642 if(_tdata) 644 ++(_tdata->task_depth); 643 ++(_tdata->task_depth); 645 promise_type _p{}; 644 promise_type _p{}; 646 m_future_list.emplace_back(_p.get_future() 645 m_future_list.emplace_back(_p.get_future()); 647 _p.set_value(func(args...)); 646 _p.set_value(func(args...)); 648 if(_tdata) 647 if(_tdata) 649 --(_tdata->task_depth); 648 --(_tdata->task_depth); 650 } 649 } 651 650 652 template <typename Tp, typename Arg, intmax_t 651 template <typename Tp, typename Arg, intmax_t MaxDepth> 653 template <typename Up, enable_if_t<!std::is_vo 652 template <typename Up, enable_if_t<!std::is_void<Up>::value, int>> 654 inline Up 653 inline Up 655 TaskGroup<Tp, Arg, MaxDepth>::join(Up accum) 654 TaskGroup<Tp, Arg, MaxDepth>::join(Up accum) 656 { 655 { 657 this->wait(); 656 this->wait(); 658 for(auto& itr : m_task_list) 657 for(auto& itr : m_task_list) 659 { 658 { 660 using RetT = decay_t<decltype(itr->get 659 using RetT = decay_t<decltype(itr->get())>; 661 accum = std::move(m_join(std::ref 660 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr->get()))); 662 } 661 } 663 for(auto& itr : m_future_list) 662 for(auto& itr : m_future_list) 664 { 663 { 665 using RetT = decay_t<decltype(itr.get( 664 using RetT = decay_t<decltype(itr.get())>; 666 accum = std::move(m_join(std::ref 665 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr.get()))); 667 } 666 } 668 this->clear(); 667 this->clear(); 669 return accum; 668 return accum; 670 } 669 } 671 670 672 template <typename Tp, typename Arg, intmax_t 671 template <typename Tp, typename Arg, intmax_t MaxDepth> 673 template <typename Up, typename Rp, 672 template <typename Up, typename Rp, 674 enable_if_t<std::is_void<Up>::value 673 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int>> 675 inline void 674 inline void 676 TaskGroup<Tp, Arg, MaxDepth>::join() 675 TaskGroup<Tp, Arg, MaxDepth>::join() 677 { 676 { 678 this->wait(); 677 this->wait(); 679 for(auto& itr : m_task_list) 678 for(auto& itr : m_task_list) 680 itr->get(); 679 itr->get(); 681 for(auto& itr : m_future_list) 680 for(auto& itr : m_future_list) 682 itr.get(); 681 itr.get(); 683 m_join(); 682 m_join(); 684 this->clear(); 683 this->clear(); 685 } 684 } 686 685 687 template <typename Tp, typename Arg, intmax_t 686 template <typename Tp, typename Arg, intmax_t MaxDepth> 688 template <typename Up, typename Rp, 687 template <typename Up, typename Rp, 689 enable_if_t<std::is_void<Up>::value 688 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int>> 690 inline void 689 inline void 691 TaskGroup<Tp, Arg, MaxDepth>::join() 690 TaskGroup<Tp, Arg, MaxDepth>::join() 692 { 691 { 693 this->wait(); 692 this->wait(); 694 for(auto& itr : m_task_list) 693 for(auto& itr : m_task_list) 695 { 694 { 696 using RetT = decay_t<decltype(itr->get 695 using RetT = decay_t<decltype(itr->get())>; 697 m_join(std::forward<RetT>(itr->get())) 696 m_join(std::forward<RetT>(itr->get())); 698 } 697 } 699 for(auto& itr : m_future_list) 698 for(auto& itr : m_future_list) 700 { 699 { 701 using RetT = decay_t<decltype(itr.get( 700 using RetT = decay_t<decltype(itr.get())>; 702 m_join(std::forward<RetT>(itr.get())); 701 m_join(std::forward<RetT>(itr.get())); 703 } 702 } 704 this->clear(); 703 this->clear(); 705 } 704 } 706 705 707 template <typename Tp, typename Arg, intmax_t 706 template <typename Tp, typename Arg, intmax_t MaxDepth> 708 void 707 void 709 TaskGroup<Tp, Arg, MaxDepth>::clear() 708 TaskGroup<Tp, Arg, MaxDepth>::clear() 710 { 709 { 711 m_future_list.clear(); 710 m_future_list.clear(); 712 m_task_list.clear(); 711 m_task_list.clear(); 713 } 712 } 714 713 715 template <typename Tp, typename Arg, intmax_t 714 template <typename Tp, typename Arg, intmax_t MaxDepth> 716 void 715 void 717 TaskGroup<Tp, Arg, MaxDepth>::internal_update( 716 TaskGroup<Tp, Arg, MaxDepth>::internal_update() 718 { 717 { 719 if(!m_pool) 718 if(!m_pool) 720 m_pool = internal::get_default_threadp 719 m_pool = internal::get_default_threadpool(); 721 720 722 if(!m_pool) 721 if(!m_pool) 723 { 722 { 724 std::stringstream ss{}; 723 std::stringstream ss{}; 725 ss << "[TaskGroup]> " << __FUNCTION__ 724 ss << "[TaskGroup]> " << __FUNCTION__ << "@" << __LINE__ 726 << " :: nullptr to thread pool"; 725 << " :: nullptr to thread pool"; 727 throw std::runtime_error(ss.str()); 726 throw std::runtime_error(ss.str()); 728 } 727 } 729 728 730 if(m_pool->is_tbb_threadpool()) 729 if(m_pool->is_tbb_threadpool()) 731 { 730 { 732 m_tbb_task_group = new tbb_task_group_ 731 m_tbb_task_group = new tbb_task_group_t{}; 733 } 732 } 734 } 733 } 735 734 736 template <typename Tp, typename Arg, intmax_t 735 template <typename Tp, typename Arg, intmax_t MaxDepth> 737 int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = << 736 int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = GetEnv<int>("PTL_VERBOSE", 0); 738 737 739 } // namespace PTL 738 } // namespace PTL 740 739