Geant4 Cross Reference |
1 // 1 // 2 // MIT License 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentati 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, includ 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distr 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit perso 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the followin 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permiss 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the S 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 18 // 19 // 19 // 20 // ------------------------------------------- 20 // --------------------------------------------------------------- 21 // Tasking class header file 21 // Tasking class header file 22 // 22 // 23 // Class Description: 23 // Class Description: 24 // 24 // 25 // This file creates the a class for handling 25 // This file creates the a class for handling a group of tasks that 26 // can be independently joined 26 // can be independently joined 27 // 27 // 28 // ------------------------------------------- 28 // --------------------------------------------------------------- 29 // Author: Jonathan Madsen (Feb 13th 2018) 29 // Author: Jonathan Madsen (Feb 13th 2018) 30 // ------------------------------------------- 30 // --------------------------------------------------------------- 31 31 32 #pragma once 32 #pragma once 33 33 34 #include "PTL/AutoLock.hh" << 35 #ifndef G4GMAKE << 36 #include "" << 37 #endif << 38 #include "PTL/JoinFunction.hh" << 39 #include "PTL/ScopeDestructor.hh" << 40 #include "PTL/Task.hh" 34 #include "PTL/Task.hh" 41 #include "PTL/ThreadData.hh" << 42 #include "PTL/ThreadPool.hh" 35 #include "PTL/ThreadPool.hh" 43 #include "PTL/Types.hh" << 36 #include "PTL/VTaskGroup.hh" 44 #include "PTL/VTask.hh" << 45 #include "PTL/VUserTaskQueue.hh" << 46 #include "PTL/detail/CxxBackports.hh" << 47 37 48 #include <atomic> << 49 #include <chrono> << 50 #include <cstdint> 38 #include <cstdint> 51 #include <cstdio> << 39 #include <deque> 52 #include <functional> << 53 #include <future> 40 #include <future> 54 #include <iostream> << 41 #include <list> 55 #include <memory> << 56 #include <mutex> << 57 #include <sstream> // IWYU pragma: keep << 58 #include <stdexcept> << 59 #include <thread> << 60 #include <type_traits> << 61 #include <utility> << 62 #include <vector> 42 #include <vector> 63 43 64 #if defined(PTL_USE_TBB) << 44 #ifdef PTL_USE_TBB 65 # include <tbb/task_group.h> // IWYU pragm << 45 # include <tbb/tbb.h> 66 #endif 46 #endif 67 47 68 namespace PTL 48 namespace PTL 69 { 49 { 70 namespace internal << 50 class ThreadPool; >> 51 >> 52 //--------------------------------------------------------------------------------------// >> 53 >> 54 #if !defined(PTL_DEFAULT_OBJECT) >> 55 # define PTL_DEFAULT_OBJECT(NAME) \ >> 56 NAME() = default; \ >> 57 ~NAME() = default; \ >> 58 NAME(const NAME&) = default; \ >> 59 NAME(NAME&&) noexcept = default; \ >> 60 NAME& operator=(const NAME&) = default; \ >> 61 NAME& operator=(NAME&&) noexcept = default; >> 62 #endif >> 63 >> 64 //--------------------------------------------------------------------------------------// >> 65 >> 66 template <typename JoinT, typename JoinArg> >> 67 struct JoinFunction 71 { 68 { 72 std::atomic_uintmax_t& << 69 public: 73 task_group_counter(); << 70 using Type = std::function<JoinT(JoinT&, JoinArg&&)>; 74 71 75 ThreadPool* << 72 public: 76 get_default_threadpool(); << 73 PTL_DEFAULT_OBJECT(JoinFunction) >> 74 >> 75 template <typename Func> >> 76 JoinFunction(Func&& func) >> 77 : m_func(std::forward<Func>(func)) >> 78 {} 77 79 78 intmax_t << 80 template <typename... Args> 79 get_task_depth(); << 81 JoinT& operator()(Args&&... args) 80 } // namespace internal << 82 { >> 83 return std::move(m_func(std::forward<Args>(args)...)); >> 84 } 81 85 82 template <typename Tp, typename Arg = Tp, intm << 86 private: >> 87 Type m_func = [](JoinT& lhs, JoinArg&&) { return lhs; }; >> 88 }; >> 89 >> 90 //--------------------------------------------------------------------------------------// >> 91 >> 92 template <typename JoinArg> >> 93 struct JoinFunction<void, JoinArg> >> 94 { >> 95 public: >> 96 using Type = std::function<void(JoinArg)>; >> 97 >> 98 public: >> 99 PTL_DEFAULT_OBJECT(JoinFunction) >> 100 >> 101 template <typename Func> >> 102 JoinFunction(Func&& func) >> 103 : m_func(std::forward<Func>(func)) >> 104 {} >> 105 >> 106 template <typename... Args> >> 107 void operator()(Args&&... args) >> 108 { >> 109 m_func(std::forward<Args>(args)...); >> 110 } >> 111 >> 112 private: >> 113 Type m_func = [](JoinArg) {}; >> 114 }; >> 115 >> 116 //--------------------------------------------------------------------------------------// >> 117 >> 118 template <> >> 119 struct JoinFunction<void, void> >> 120 { >> 121 public: >> 122 using Type = std::function<void()>; >> 123 >> 124 public: >> 125 PTL_DEFAULT_OBJECT(JoinFunction) >> 126 >> 127 template <typename Func> >> 128 JoinFunction(Func&& func) >> 129 : m_func(std::forward<Func>(func)) >> 130 {} >> 131 >> 132 void operator()() { m_func(); } >> 133 >> 134 private: >> 135 Type m_func = []() {}; >> 136 }; >> 137 >> 138 //--------------------------------------------------------------------------------------// >> 139 >> 140 template <typename Tp, typename Arg = Tp> 83 class TaskGroup 141 class TaskGroup >> 142 : public VTaskGroup >> 143 , public TaskAllocator<TaskGroup<Tp, Arg>> 84 { 144 { 85 public: 145 public: 86 //---------------------------------------- 146 //------------------------------------------------------------------------// 87 template <typename Up> << 147 typedef decay_t<Arg> ArgTp; 88 using container_type = std::vector<Up>; << 148 typedef Tp result_type; 89 << 149 typedef TaskGroup<Tp, Arg> this_type; 90 using tid_type = std::thread << 150 typedef std::promise<ArgTp> promise_type; 91 using size_type = uintmax_t; << 151 typedef std::future<ArgTp> future_type; 92 using lock_t = Mutex; << 152 typedef std::packaged_task<ArgTp()> packaged_task_type; 93 using atomic_int = std::atomic << 153 typedef list_type<future_type> task_list_t; 94 using atomic_uint = std::atomic << 154 typedef typename JoinFunction<Tp, Arg>::Type join_type; 95 using condition_t = Condition; << 155 typedef typename task_list_t::iterator iterator; 96 using ArgTp = decay_t<Arg << 156 typedef typename task_list_t::reverse_iterator reverse_iterator; 97 using result_type = Tp; << 157 typedef typename task_list_t::const_iterator const_iterator; 98 using task_pointer = std::shared << 158 typedef typename task_list_t::const_reverse_iterator const_reverse_iterator; 99 using task_list_t = container_t << 100 using this_type = TaskGroup<T << 101 using promise_type = std::promis << 102 using future_type = std::future << 103 using packaged_task_type = std::packag << 104 using future_list_t = container_t << 105 using join_type = typename Jo << 106 using iterator = typename fu << 107 using reverse_iterator = typename fu << 108 using const_iterator = typename fu << 109 using const_reverse_iterator = typename fu << 110 //---------------------------------------- 159 //------------------------------------------------------------------------// 111 template <typename... Args> 160 template <typename... Args> 112 using task_type = Task<ArgTp, decay_t<Args 161 using task_type = Task<ArgTp, decay_t<Args>...>; 113 //---------------------------------------- 162 //------------------------------------------------------------------------// 114 163 115 public: 164 public: 116 // Constructor 165 // Constructor 117 template <typename Func> 166 template <typename Func> 118 TaskGroup(Func&& _join, ThreadPool* _tp = << 167 TaskGroup(Func&& _join, ThreadPool* _tp = nullptr) 119 << 168 : VTaskGroup(_tp) 120 template <typename Up = Tp> << 169 , m_join(std::forward<Func>(_join)) 121 TaskGroup(ThreadPool* _tp = internal::get_ << 170 {} 122 enable_if_t<std::is_void<Up>::va << 171 template <typename Up = Tp, enable_if_t<std::is_same<Up, void>::value, int> = 0> 123 << 172 explicit TaskGroup(ThreadPool* _tp = nullptr) >> 173 : VTaskGroup(_tp) >> 174 , m_join([]() {}) >> 175 {} 124 // Destructor 176 // Destructor 125 ~TaskGroup(); << 177 virtual ~TaskGroup() { this->clear(); } 126 178 127 // delete copy-construct 179 // delete copy-construct 128 TaskGroup(const this_type&) = delete; 180 TaskGroup(const this_type&) = delete; 129 // define move-construct 181 // define move-construct 130 // NOLINTNEXTLINE(performance-noexcept-mov << 131 TaskGroup(this_type&& rhs) = default; 182 TaskGroup(this_type&& rhs) = default; 132 // delete copy-assign 183 // delete copy-assign 133 TaskGroup& operator=(const this_type& rhs) << 184 this_type& operator=(const this_type& rhs) = delete; 134 // define move-assign 185 // define move-assign 135 // NOLINTNEXTLINE(performance-noexcept-mov << 186 this_type& operator=(this_type&& rhs) = default; 136 TaskGroup& operator=(this_type&& rhs) = de << 137 187 138 public: 188 public: >> 189 //------------------------------------------------------------------------// 139 template <typename Up> 190 template <typename Up> 140 std::shared_ptr<Up> operator+=(std::shared << 191 Up* operator+=(Up* _task) 141 << 142 // wait to finish << 143 void wait(); << 144 << 145 // increment (prefix) << 146 intmax_t operator++() { return ++(m_tot_ta << 147 intmax_t operator++(int) { return (m_tot_t << 148 intmax_t operator--() { return --(m_tot_ta << 149 intmax_t operator--(int) { return (m_tot_t << 150 << 151 // size << 152 intmax_t size() const { return m_tot_task_ << 153 << 154 // get the locks/conditions << 155 lock_t& task_lock() { return m_task_l << 156 condition_t& task_cond() { return m_task_c << 157 << 158 // identifier << 159 uintmax_t id() const { return m_id; } << 160 << 161 // thread pool << 162 void set_pool(ThreadPool* tp) { m_ << 163 ThreadPool*& pool() { return m_pool; } << 164 ThreadPool* pool() const { return m_pool; << 165 << 166 bool is_native_task_group() const { return << 167 bool is_main() const { return this_tid() = << 168 << 169 // check if any tasks are still pending << 170 intmax_t pending() { return m_tot_task_cou << 171 << 172 static void set_verbose(int level) { f_ver << 173 << 174 ScopeDestructor get_scope_destructor(); << 175 << 176 void notify(); << 177 void notify_all(); << 178 << 179 void reserve(size_t _n) << 180 { 192 { 181 m_task_list.reserve(_n); << 193 // store in list 182 m_future_list.reserve(_n); << 194 vtask_list.push_back(_task); >> 195 // thread-safe increment of tasks in task group >> 196 operator++(); >> 197 // add the future >> 198 m_task_set.push_back(std::move(_task->get_future())); >> 199 // return >> 200 return _task; 183 } 201 } 184 202 185 public: 203 public: >> 204 //------------------------------------------------------------------------// 186 template <typename Func, typename... Args> 205 template <typename Func, typename... Args> 187 std::shared_ptr<task_type<Args...>> wrap(F << 206 task_type<Args...>* wrap(Func&& func, Args... args) 188 { 207 { 189 return operator+=(std::make_shared<tas << 208 return operator+=( 190 is_native_task_group(), m_depth, s << 209 new task_type<Args...>(this, std::forward<Func>(func), args...)); 191 } 210 } 192 211 193 template <typename Func, typename... Args, << 212 public: 194 enable_if_t<std::is_void<Up>::value, void> << 213 //------------------------------------------------------------------------// 195 << 214 template <typename Func, typename... Args> 196 template <typename Func, typename... Args, << 215 void exec(Func&& func, Args... args) 197 enable_if_t<!std::is_void<Up>::value, void << 216 { 198 << 217 m_pool->add_task(wrap(std::forward<Func>(func), args...)); >> 218 } >> 219 //------------------------------------------------------------------------// 199 template <typename Func, typename... Args> 220 template <typename Func, typename... Args> 200 void run(Func func, Args... args) << 221 void run(Func&& func, Args... args) 201 { 222 { 202 exec(std::move(func), std::move(args). << 223 m_pool->add_task(wrap(std::forward<Func>(func), args...)); >> 224 } >> 225 //------------------------------------------------------------------------// >> 226 template <typename Func, typename... Args> >> 227 void parallel_for(const intmax_t& nitr, const intmax_t& chunks, Func&& func, >> 228 Args... args) >> 229 { >> 230 auto nsplit = nitr / chunks; >> 231 auto nmod = nitr % chunks; >> 232 if(nsplit < 1) >> 233 nsplit = 1; >> 234 for(intmax_t n = 0; n < nsplit; ++n) >> 235 { >> 236 auto _beg = n * chunks; >> 237 auto _end = (n + 1) * chunks + ((n + 1 == nsplit) ? nmod : 0); >> 238 run(std::forward<Func>(func), std::move(_beg), std::move(_end), args...); >> 239 } 203 } 240 } 204 241 205 protected: 242 protected: 206 template <typename Up, typename Func, type << 243 //------------------------------------------------------------------------// 207 enable_if_t<std::is_void<Up>::value, void> << 208 << 209 template <typename Up, typename Func, type << 210 enable_if_t<!std::is_void<Up>::value, void << 211 << 212 // shorter typedefs 244 // shorter typedefs 213 using itr_t = iterator; << 245 typedef iterator itr_t; 214 using citr_t = const_iterator; << 246 typedef const_iterator citr_t; 215 using ritr_t = reverse_iterator; << 247 typedef reverse_iterator ritr_t; 216 using critr_t = const_reverse_iterator; << 248 typedef const_reverse_iterator critr_t; 217 249 218 public: 250 public: 219 //---------------------------------------- 251 //------------------------------------------------------------------------// 220 // Get tasks with non-void return types 252 // Get tasks with non-void return types 221 // 253 // 222 future_list_t& get_tasks() { return << 254 task_list_t& get_tasks() { return m_task_set; } 223 const future_list_t& get_tasks() const { r << 255 const task_list_t& get_tasks() const { return m_task_set; } 224 256 225 //---------------------------------------- 257 //------------------------------------------------------------------------// 226 // iterate over tasks with return type 258 // iterate over tasks with return type 227 // 259 // 228 itr_t begin() { return m_future_list.beg << 260 itr_t begin() { return m_task_set.begin(); } 229 itr_t end() { return m_future_list.end() << 261 itr_t end() { return m_task_set.end(); } 230 citr_t begin() const { return m_future_li << 262 citr_t begin() const { return m_task_set.begin(); } 231 citr_t end() const { return m_future_list << 263 citr_t end() const { return m_task_set.end(); } 232 citr_t cbegin() const { return m_future_l << 264 citr_t cbegin() const { return m_task_set.begin(); } 233 citr_t cend() const { return m_future_lis << 265 citr_t cend() const { return m_task_set.end(); } 234 ritr_t rbegin() { return m_future_list.rb << 266 ritr_t rbegin() { return m_task_set.rbegin(); } 235 ritr_t rend() { return m_future_list.rend << 267 ritr_t rend() { return m_task_set.rend(); } 236 critr_t rbegin() const { return m_future_l << 268 critr_t rbegin() const { return m_task_set.rbegin(); } 237 critr_t rend() const { return m_future_lis << 269 critr_t rend() const { return m_task_set.rend(); } 238 270 239 //---------------------------------------- 271 //------------------------------------------------------------------------// 240 // wait to finish 272 // wait to finish 241 template <typename Up = Tp, enable_if_t<!s 273 template <typename Up = Tp, enable_if_t<!std::is_void<Up>::value, int> = 0> 242 inline Up join(Up accum = {}); << 274 inline Up join(Up accum = {}) >> 275 { >> 276 this->wait(); >> 277 for(auto& itr : m_task_set) >> 278 { >> 279 using RetT = decay_t<decltype(itr.get())>; >> 280 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr.get()))); >> 281 } >> 282 this->clear(); >> 283 return accum; >> 284 } 243 //---------------------------------------- 285 //------------------------------------------------------------------------// 244 // wait to finish 286 // wait to finish 245 template <typename Up = Tp, typename Rp = 287 template <typename Up = Tp, typename Rp = Arg, 246 enable_if_t<std::is_void<Up>::va 288 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int> = 0> 247 inline void join(); << 289 inline void join() >> 290 { >> 291 this->wait(); >> 292 for(auto& itr : m_task_set) >> 293 itr.get(); >> 294 m_join(); >> 295 this->clear(); >> 296 } 248 //---------------------------------------- 297 //------------------------------------------------------------------------// 249 // wait to finish 298 // wait to finish 250 template <typename Up = Tp, typename Rp = 299 template <typename Up = Tp, typename Rp = Arg, 251 enable_if_t<std::is_void<Up>::va 300 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int> = 0> 252 inline void join(); << 301 inline void join() 253 //---------------------------------------- << 254 // clear the task result history << 255 void clear(); << 256 << 257 protected: << 258 //---------------------------------------- << 259 // get the thread id << 260 static tid_type this_tid() { return std::t << 261 << 262 //---------------------------------------- << 263 // get the task count << 264 atomic_int& task_count() { return m_ << 265 const atomic_int& task_count() const { ret << 266 << 267 protected: << 268 static int f_verbose; << 269 // Private variables << 270 uintmax_t m_id = internal::t << 271 intmax_t m_depth = internal::g << 272 tid_type m_main_tid = std::this_t << 273 atomic_int m_tot_task_count{ 0 }; << 274 lock_t m_task_lock = {}; << 275 condition_t m_task_cond = {}; << 276 join_type m_join{}; << 277 ThreadPool* m_pool = inter << 278 tbb_task_group_t* m_tbb_task_group = nullp << 279 task_list_t m_task_list = {}; << 280 future_list_t m_future_list = {}; << 281 << 282 private: << 283 void internal_update(); << 284 }; << 285 << 286 } // namespace PTL << 287 namespace PTL << 288 { << 289 template <typename Tp, typename Arg, intmax_t << 290 template <typename Func> << 291 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Func&& << 292 : m_join{ std::forward<Func>(_join) } << 293 , m_pool{ _tp } << 294 { << 295 internal_update(); << 296 } << 297 << 298 template <typename Tp, typename Arg, intmax_t << 299 template <typename Up> << 300 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Thread << 301 enable << 302 : m_join{ []() {} } << 303 , m_pool{ _tp } << 304 { << 305 internal_update(); << 306 } << 307 << 308 // Destructor << 309 template <typename Tp, typename Arg, intmax_t << 310 TaskGroup<Tp, Arg, MaxDepth>::~TaskGroup() << 311 { << 312 { 302 { 313 // task will decrement counter and the << 314 // condition variable so acquiring loc << 315 // task group from being destroyed bef << 316 AutoLock _lk{ m_task_lock, std::defer_ << 317 if(!_lk.owns_lock()) << 318 _lk.lock(); << 319 } << 320 << 321 if(m_tbb_task_group) << 322 { << 323 auto* _arena = m_pool->get_task_arena( << 324 _arena->execute([this]() { this->m_tbb << 325 } << 326 delete m_tbb_task_group; << 327 this->clear(); << 328 } << 329 << 330 template <typename Tp, typename Arg, intmax_t << 331 template <typename Up> << 332 std::shared_ptr<Up> << 333 TaskGroup<Tp, Arg, MaxDepth>::operator+=(std:: << 334 { << 335 // thread-safe increment of tasks in task << 336 operator++(); << 337 // copy the shared pointer to abstract ins << 338 m_task_list.push_back(_task); << 339 // return the derived instance << 340 return std::move(_task); << 341 } << 342 << 343 template <typename Tp, typename Arg, intmax_t << 344 void << 345 TaskGroup<Tp, Arg, MaxDepth>::wait() << 346 { << 347 auto _dtor = ScopeDestructor{ [&]() { << 348 if(m_tbb_task_group) << 349 { << 350 auto* _arena = m_pool->get_task_ar << 351 _arena->execute([this]() { this->m << 352 } << 353 } }; << 354 << 355 ThreadData* data = ThreadData::GetInstance << 356 if(!data) << 357 return; << 358 << 359 // if no pool was initially present at cre << 360 if(!m_pool) << 361 { << 362 // check for master MT run-manager << 363 m_pool = internal::get_default_threadp << 364 << 365 // if no thread pool created << 366 if(!m_pool) << 367 { << 368 if(f_verbose > 0) << 369 { << 370 fprintf(stderr, "%s @ %i :: Wa << 371 __FUNCTION__, __LINE__ << 372 std::cerr << __FUNCTION__ << " << 373 << "nullptr to threa << 374 } << 375 return; << 376 } << 377 } << 378 << 379 ThreadPool* tpool = (m_pool) ? m_pool << 380 VUserTaskQueue* taskq = (tpool) ? tpool->g << 381 << 382 bool _is_main = data->is_main; << 383 bool _within_task = data->within_task; << 384 << 385 auto is_active_state = [&]() { << 386 return (tpool->state()->load(std::memo << 387 thread_pool::state::STOPPED); << 388 }; << 389 << 390 auto execute_this_threads_tasks = [&]() { << 391 if(!taskq) << 392 return; << 393 << 394 // only want to process if within a ta << 395 if((!_is_main || tpool->size() < 2) && << 396 { << 397 int bin = static_cast<int>(taskq-> << 398 // const auto nitr = (tpool) ? tpo << 399 // Thread::hardware_concurrency(); << 400 while(this->pending() > 0) << 401 { << 402 if(!taskq->empty()) << 403 { << 404 auto _task = taskq->GetTas << 405 if(_task) << 406 (*_task)(); << 407 } << 408 } << 409 } << 410 }; << 411 << 412 // checks for validity << 413 if(!is_native_task_group()) << 414 { << 415 // for external threads << 416 if(!_is_main || tpool->size() < 2) << 417 return; << 418 } << 419 else if(f_verbose > 0) << 420 { << 421 if(!tpool || !taskq) << 422 { << 423 // something is wrong, didn't crea << 424 fprintf(stderr, << 425 "%s @ %i :: Warning! nullp << 426 "(%p)\n", << 427 __FUNCTION__, __LINE__, st << 428 static_cast<void*>(taskq)) << 429 } << 430 // return if thread pool isn't built << 431 else if(is_native_task_group() && !tpo << 432 { << 433 fprintf(stderr, "%s @ %i :: Warnin << 434 __FUNCTION__, __LINE__); << 435 } << 436 else if(!is_active_state()) << 437 { << 438 fprintf(stderr, "%s @ %i :: Warnin << 439 __FUNCTION__, __LINE__); << 440 } << 441 } << 442 << 443 intmax_t wake_size = 2; << 444 AutoLock _lock(m_task_lock, std::defer_loc << 445 << 446 while(is_active_state()) << 447 { << 448 execute_this_threads_tasks(); << 449 << 450 // while loop protects against spuriou << 451 while(_is_main && pending() > 0 && is_ << 452 { << 453 // auto _wake = [&]() { return (wa << 454 // !is_active_state()); << 455 // }; << 456 << 457 // lock before sleeping on conditi << 458 if(!_lock.owns_lock()) << 459 _lock.lock(); << 460 << 461 // Wait until signaled that a task << 462 // Unlock mutex while wait, then l << 463 // when true, this wakes the threa << 464 if(pending() >= wake_size) << 465 { << 466 m_task_cond.wait(_lock); << 467 } << 468 else << 469 { << 470 m_task_cond.wait_for(_lock, st << 471 } << 472 // unlock << 473 if(_lock.owns_lock()) << 474 _lock.unlock(); << 475 } << 476 << 477 // if pending is not greater than zero << 478 if(pending() <= 0) << 479 break; << 480 } << 481 << 482 if(_lock.owns_lock()) << 483 _lock.unlock(); << 484 << 485 intmax_t ntask = this->task_count().load() << 486 if(ntask > 0) << 487 { << 488 std::stringstream ss; << 489 ss << "\nWarning! Join operation issue << 490 << "are running!" << std::endl; << 491 std::cerr << ss.str(); << 492 this->wait(); 303 this->wait(); 493 } << 304 for(auto& itr : m_task_set) 494 } << 495 << 496 template <typename Tp, typename Arg, intmax_t << 497 ScopeDestructor << 498 TaskGroup<Tp, Arg, MaxDepth>::get_scope_destru << 499 { << 500 auto& _counter = m_tot_task_count; << 501 auto& _task_cond = task_cond(); << 502 auto& _task_lock = task_lock(); << 503 return ScopeDestructor{ [&_task_cond, &_ta << 504 auto _count = --(_counter); << 505 if(_count < 1) << 506 { << 507 AutoLock _lk{ _task_lock }; << 508 _task_cond.notify_all(); << 509 } << 510 } }; << 511 } << 512 << 513 template <typename Tp, typename Arg, intmax_t << 514 void << 515 TaskGroup<Tp, Arg, MaxDepth>::notify() << 516 { << 517 AutoLock _lk{ m_task_lock }; << 518 m_task_cond.notify_one(); << 519 } << 520 << 521 template <typename Tp, typename Arg, intmax_t << 522 void << 523 TaskGroup<Tp, Arg, MaxDepth>::notify_all() << 524 { << 525 AutoLock _lk{ m_task_lock }; << 526 m_task_cond.notify_all(); << 527 } << 528 << 529 template <typename Tp, typename Arg, intmax_t << 530 template <typename Func, typename... Args, typ << 531 enable_if_t<std::is_void<Up>::value, void> << 532 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, << 533 { << 534 if(MaxDepth > 0 && !m_tbb_task_group && Th << 535 ThreadData::GetInstance()->task_depth > << 536 { << 537 local_exec<Tp>(std::move(func), std::m << 538 } << 539 else << 540 { << 541 auto& _counter = m_tot_task_count; << 542 auto& _task_cond = task_cond(); << 543 auto& _task_lock = task_lock(); << 544 auto _task = wrap([&_task_cond, << 545 auto* _tdata = ThreadData::GetInst << 546 if(_tdata) << 547 ++(_tdata->task_depth); << 548 func(args...); << 549 auto _count = --(_counter); << 550 if(_tdata) << 551 --(_tdata->task_depth); << 552 if(_count < 1) << 553 { << 554 AutoLock _lk{ _task_lock }; << 555 _task_cond.notify_all(); << 556 } << 557 }); << 558 << 559 if(m_tbb_task_group) << 560 { << 561 auto* _arena = m_pool->ge << 562 auto* _tbb_task_group = m_tbb_task << 563 auto* _ptask = _task.get( << 564 _arena->execute([_tbb_task_group, << 565 _tbb_task_group->run([_ptask]( << 566 }); << 567 } << 568 else << 569 { << 570 m_pool->add_task(std::move(_task)) << 571 } << 572 } << 573 } << 574 template <typename Tp, typename Arg, intmax_t << 575 template <typename Func, typename... Args, typ << 576 enable_if_t<!std::is_void<Up>::value, void> << 577 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, << 578 { << 579 if(MaxDepth > 0 && !m_tbb_task_group && Th << 580 ThreadData::GetInstance()->task_depth > << 581 { << 582 local_exec<Tp>(std::move(func), std::m << 583 } << 584 else << 585 { << 586 auto& _counter = m_tot_task_count; << 587 auto& _task_cond = task_cond(); << 588 auto& _task_lock = task_lock(); << 589 auto _task = wrap([&_task_cond, << 590 auto* _tdata = ThreadData::GetInst << 591 if(_tdata) << 592 ++(_tdata->task_depth); << 593 auto&& _ret = func(args...); << 594 auto _count = --(_counter); << 595 if(_tdata) << 596 --(_tdata->task_depth); << 597 if(_count < 1) << 598 { << 599 AutoLock _lk{ _task_lock }; << 600 _task_cond.notify_all(); << 601 } << 602 return std::forward<decltype(_ret) << 603 }); << 604 << 605 if(m_tbb_task_group) << 606 { << 607 auto* _arena = m_pool->ge << 608 auto* _tbb_task_group = m_tbb_task << 609 auto* _ptask = _task.get( << 610 _arena->execute([_tbb_task_group, << 611 _tbb_task_group->run([_ptask]( << 612 }); << 613 } << 614 else << 615 { 305 { 616 m_pool->add_task(std::move(_task)) << 306 using RetT = decay_t<decltype(itr.get())>; >> 307 m_join(std::forward<RetT>(itr.get())); 617 } 308 } >> 309 this->clear(); 618 } 310 } 619 } << 311 //------------------------------------------------------------------------// 620 << 312 // clear the task result history 621 template <typename Tp, typename Arg, intmax_t << 313 void clear() 622 template <typename Up, typename Func, typename << 623 enable_if_t<std::is_void<Up>::value, void> << 624 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func << 625 { << 626 auto* _tdata = ThreadData::GetInstance(); << 627 if(_tdata) << 628 ++(_tdata->task_depth); << 629 promise_type _p{}; << 630 m_future_list.emplace_back(_p.get_future() << 631 func(args...); << 632 _p.set_value(); << 633 if(_tdata) << 634 --(_tdata->task_depth); << 635 } << 636 << 637 template <typename Tp, typename Arg, intmax_t << 638 template <typename Up, typename Func, typename << 639 enable_if_t<!std::is_void<Up>::value, void> << 640 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func << 641 { << 642 auto* _tdata = ThreadData::GetInstance(); << 643 if(_tdata) << 644 ++(_tdata->task_depth); << 645 promise_type _p{}; << 646 m_future_list.emplace_back(_p.get_future() << 647 _p.set_value(func(args...)); << 648 if(_tdata) << 649 --(_tdata->task_depth); << 650 } << 651 << 652 template <typename Tp, typename Arg, intmax_t << 653 template <typename Up, enable_if_t<!std::is_vo << 654 inline Up << 655 TaskGroup<Tp, Arg, MaxDepth>::join(Up accum) << 656 { << 657 this->wait(); << 658 for(auto& itr : m_task_list) << 659 { << 660 using RetT = decay_t<decltype(itr->get << 661 accum = std::move(m_join(std::ref << 662 } << 663 for(auto& itr : m_future_list) << 664 { << 665 using RetT = decay_t<decltype(itr.get( << 666 accum = std::move(m_join(std::ref << 667 } << 668 this->clear(); << 669 return accum; << 670 } << 671 << 672 template <typename Tp, typename Arg, intmax_t << 673 template <typename Up, typename Rp, << 674 enable_if_t<std::is_void<Up>::value << 675 inline void << 676 TaskGroup<Tp, Arg, MaxDepth>::join() << 677 { << 678 this->wait(); << 679 for(auto& itr : m_task_list) << 680 itr->get(); << 681 for(auto& itr : m_future_list) << 682 itr.get(); << 683 m_join(); << 684 this->clear(); << 685 } << 686 << 687 template <typename Tp, typename Arg, intmax_t << 688 template <typename Up, typename Rp, << 689 enable_if_t<std::is_void<Up>::value << 690 inline void << 691 TaskGroup<Tp, Arg, MaxDepth>::join() << 692 { << 693 this->wait(); << 694 for(auto& itr : m_task_list) << 695 { << 696 using RetT = decay_t<decltype(itr->get << 697 m_join(std::forward<RetT>(itr->get())) << 698 } << 699 for(auto& itr : m_future_list) << 700 { << 701 using RetT = decay_t<decltype(itr.get( << 702 m_join(std::forward<RetT>(itr.get())); << 703 } << 704 this->clear(); << 705 } << 706 << 707 template <typename Tp, typename Arg, intmax_t << 708 void << 709 TaskGroup<Tp, Arg, MaxDepth>::clear() << 710 { << 711 m_future_list.clear(); << 712 m_task_list.clear(); << 713 } << 714 << 715 template <typename Tp, typename Arg, intmax_t << 716 void << 717 TaskGroup<Tp, Arg, MaxDepth>::internal_update( << 718 { << 719 if(!m_pool) << 720 m_pool = internal::get_default_threadp << 721 << 722 if(!m_pool) << 723 { << 724 std::stringstream ss{}; << 725 ss << "[TaskGroup]> " << __FUNCTION__ << 726 << " :: nullptr to thread pool"; << 727 throw std::runtime_error(ss.str()); << 728 } << 729 << 730 if(m_pool->is_tbb_threadpool()) << 731 { 314 { 732 m_tbb_task_group = new tbb_task_group_ << 315 m_task_set.clear(); >> 316 VTaskGroup::clear(); 733 } 317 } 734 } << 735 318 736 template <typename Tp, typename Arg, intmax_t << 319 protected: 737 int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = << 320 // Protected variables >> 321 task_list_t m_task_set; >> 322 join_type m_join; >> 323 }; 738 324 739 } // namespace PTL 325 } // namespace PTL 740 326