Geant4 Cross Reference |
1 // 1 // 2 // MIT License 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentati 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, includ 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distr 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit perso 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the followin 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permiss 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the S 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 18 // 19 // 19 // 20 // ------------------------------------------- 20 // --------------------------------------------------------------- 21 // Tasking class header file 21 // Tasking class header file 22 // 22 // 23 // Class Description: 23 // Class Description: 24 // 24 // 25 // This file creates the a class for handling 25 // This file creates the a class for handling a group of tasks that 26 // can be independently joined 26 // can be independently joined 27 // 27 // 28 // ------------------------------------------- 28 // --------------------------------------------------------------- 29 // Author: Jonathan Madsen (Feb 13th 2018) 29 // Author: Jonathan Madsen (Feb 13th 2018) 30 // ------------------------------------------- 30 // --------------------------------------------------------------- 31 31 32 #pragma once 32 #pragma once 33 33 34 #include "PTL/AutoLock.hh" << 35 #ifndef G4GMAKE << 36 #include "" << 37 #endif << 38 #include "PTL/JoinFunction.hh" 34 #include "PTL/JoinFunction.hh" 39 #include "PTL/ScopeDestructor.hh" << 40 #include "PTL/Task.hh" 35 #include "PTL/Task.hh" 41 #include "PTL/ThreadData.hh" 36 #include "PTL/ThreadData.hh" 42 #include "PTL/ThreadPool.hh" 37 #include "PTL/ThreadPool.hh" 43 #include "PTL/Types.hh" << 44 #include "PTL/VTask.hh" << 45 #include "PTL/VUserTaskQueue.hh" << 46 #include "PTL/detail/CxxBackports.hh" << 47 38 48 #include <atomic> 39 #include <atomic> 49 #include <chrono> << 50 #include <cstdint> 40 #include <cstdint> 51 #include <cstdio> << 41 #include <cstdlib> 52 #include <functional> << 42 #include <deque> 53 #include <future> 43 #include <future> 54 #include <iostream> 44 #include <iostream> 55 #include <memory> 45 #include <memory> 56 #include <mutex> << 57 #include <sstream> // IWYU pragma: keep << 58 #include <stdexcept> << 59 #include <thread> << 60 #include <type_traits> << 61 #include <utility> << 62 #include <vector> 46 #include <vector> 63 47 64 #if defined(PTL_USE_TBB) 48 #if defined(PTL_USE_TBB) 65 # include <tbb/task_group.h> // IWYU pragm << 49 # include <tbb/tbb.h> 66 #endif 50 #endif 67 51 68 namespace PTL 52 namespace PTL 69 { 53 { 70 namespace internal 54 namespace internal 71 { 55 { 72 std::atomic_uintmax_t& 56 std::atomic_uintmax_t& 73 task_group_counter(); 57 task_group_counter(); 74 58 75 ThreadPool* 59 ThreadPool* 76 get_default_threadpool(); 60 get_default_threadpool(); 77 61 78 intmax_t 62 intmax_t 79 get_task_depth(); 63 get_task_depth(); 80 } // namespace internal 64 } // namespace internal 81 << 82 template <typename Tp, typename Arg = Tp, intm 65 template <typename Tp, typename Arg = Tp, intmax_t MaxDepth = 0> 83 class TaskGroup 66 class TaskGroup 84 { 67 { 85 public: 68 public: 86 //---------------------------------------- 69 //------------------------------------------------------------------------// 87 template <typename Up> 70 template <typename Up> 88 using container_type = std::vector<Up>; 71 using container_type = std::vector<Up>; 89 72 90 using tid_type = std::thread 73 using tid_type = std::thread::id; 91 using size_type = uintmax_t; 74 using size_type = uintmax_t; 92 using lock_t = Mutex; 75 using lock_t = Mutex; 93 using atomic_int = std::atomic 76 using atomic_int = std::atomic_intmax_t; 94 using atomic_uint = std::atomic 77 using atomic_uint = std::atomic_uintmax_t; 95 using condition_t = Condition; 78 using condition_t = Condition; 96 using ArgTp = decay_t<Arg 79 using ArgTp = decay_t<Arg>; 97 using result_type = Tp; 80 using result_type = Tp; 98 using task_pointer = std::shared 81 using task_pointer = std::shared_ptr<TaskFuture<ArgTp>>; 99 using task_list_t = container_t 82 using task_list_t = container_type<task_pointer>; 100 using this_type = TaskGroup<T 83 using this_type = TaskGroup<Tp, Arg, MaxDepth>; 101 using promise_type = std::promis 84 using promise_type = std::promise<ArgTp>; 102 using future_type = std::future 85 using future_type = std::future<ArgTp>; 103 using packaged_task_type = std::packag 86 using packaged_task_type = std::packaged_task<ArgTp()>; 104 using future_list_t = container_t 87 using future_list_t = container_type<future_type>; 105 using join_type = typename Jo 88 using join_type = typename JoinFunction<Tp, Arg>::Type; 106 using iterator = typename fu 89 using iterator = typename future_list_t::iterator; 107 using reverse_iterator = typename fu 90 using reverse_iterator = typename future_list_t::reverse_iterator; 108 using const_iterator = typename fu 91 using const_iterator = typename future_list_t::const_iterator; 109 using const_reverse_iterator = typename fu 92 using const_reverse_iterator = typename future_list_t::const_reverse_iterator; 110 //---------------------------------------- 93 //------------------------------------------------------------------------// 111 template <typename... Args> 94 template <typename... Args> 112 using task_type = Task<ArgTp, decay_t<Args 95 using task_type = Task<ArgTp, decay_t<Args>...>; 113 //---------------------------------------- 96 //------------------------------------------------------------------------// 114 97 115 public: 98 public: 116 // Constructor 99 // Constructor 117 template <typename Func> 100 template <typename Func> 118 TaskGroup(Func&& _join, ThreadPool* _tp = 101 TaskGroup(Func&& _join, ThreadPool* _tp = internal::get_default_threadpool()); 119 102 120 template <typename Up = Tp> 103 template <typename Up = Tp> 121 TaskGroup(ThreadPool* _tp = internal::get_ 104 TaskGroup(ThreadPool* _tp = internal::get_default_threadpool(), 122 enable_if_t<std::is_void<Up>::va 105 enable_if_t<std::is_void<Up>::value, int> = 0); 123 106 124 // Destructor 107 // Destructor 125 ~TaskGroup(); 108 ~TaskGroup(); 126 109 127 // delete copy-construct 110 // delete copy-construct 128 TaskGroup(const this_type&) = delete; 111 TaskGroup(const this_type&) = delete; 129 // define move-construct 112 // define move-construct 130 // NOLINTNEXTLINE(performance-noexcept-mov << 131 TaskGroup(this_type&& rhs) = default; 113 TaskGroup(this_type&& rhs) = default; 132 // delete copy-assign 114 // delete copy-assign 133 TaskGroup& operator=(const this_type& rhs) << 115 this_type& operator=(const this_type& rhs) = delete; 134 // define move-assign 116 // define move-assign 135 // NOLINTNEXTLINE(performance-noexcept-mov << 117 this_type& operator=(this_type&& rhs) = default; 136 TaskGroup& operator=(this_type&& rhs) = de << 137 118 138 public: 119 public: 139 template <typename Up> 120 template <typename Up> 140 std::shared_ptr<Up> operator+=(std::shared 121 std::shared_ptr<Up> operator+=(std::shared_ptr<Up>&& _task); 141 122 142 // wait to finish 123 // wait to finish 143 void wait(); 124 void wait(); 144 125 145 // increment (prefix) 126 // increment (prefix) 146 intmax_t operator++() { return ++(m_tot_ta 127 intmax_t operator++() { return ++(m_tot_task_count); } 147 intmax_t operator++(int) { return (m_tot_t 128 intmax_t operator++(int) { return (m_tot_task_count)++; } 148 intmax_t operator--() { return --(m_tot_ta 129 intmax_t operator--() { return --(m_tot_task_count); } 149 intmax_t operator--(int) { return (m_tot_t 130 intmax_t operator--(int) { return (m_tot_task_count)--; } 150 131 151 // size 132 // size 152 intmax_t size() const { return m_tot_task_ 133 intmax_t size() const { return m_tot_task_count.load(); } 153 134 154 // get the locks/conditions 135 // get the locks/conditions 155 lock_t& task_lock() { return m_task_l 136 lock_t& task_lock() { return m_task_lock; } 156 condition_t& task_cond() { return m_task_c 137 condition_t& task_cond() { return m_task_cond; } 157 138 158 // identifier 139 // identifier 159 uintmax_t id() const { return m_id; } 140 uintmax_t id() const { return m_id; } 160 141 161 // thread pool 142 // thread pool 162 void set_pool(ThreadPool* tp) { m_ 143 void set_pool(ThreadPool* tp) { m_pool = tp; } 163 ThreadPool*& pool() { return m_pool; } 144 ThreadPool*& pool() { return m_pool; } 164 ThreadPool* pool() const { return m_pool; 145 ThreadPool* pool() const { return m_pool; } 165 146 166 bool is_native_task_group() const { return << 147 bool is_native_task_group() const { return (m_tbb_task_group) ? false : true; } 167 bool is_main() const { return this_tid() = 148 bool is_main() const { return this_tid() == m_main_tid; } 168 149 169 // check if any tasks are still pending 150 // check if any tasks are still pending 170 intmax_t pending() { return m_tot_task_cou 151 intmax_t pending() { return m_tot_task_count.load(); } 171 152 172 static void set_verbose(int level) { f_ver 153 static void set_verbose(int level) { f_verbose = level; } 173 154 174 ScopeDestructor get_scope_destructor(); 155 ScopeDestructor get_scope_destructor(); 175 156 176 void notify(); 157 void notify(); 177 void notify_all(); 158 void notify_all(); 178 << 179 void reserve(size_t _n) 159 void reserve(size_t _n) 180 { 160 { 181 m_task_list.reserve(_n); 161 m_task_list.reserve(_n); 182 m_future_list.reserve(_n); 162 m_future_list.reserve(_n); 183 } 163 } 184 164 185 public: 165 public: 186 template <typename Func, typename... Args> 166 template <typename Func, typename... Args> 187 std::shared_ptr<task_type<Args...>> wrap(F 167 std::shared_ptr<task_type<Args...>> wrap(Func func, Args... args) 188 { 168 { 189 return operator+=(std::make_shared<tas 169 return operator+=(std::make_shared<task_type<Args...>>( 190 is_native_task_group(), m_depth, s 170 is_native_task_group(), m_depth, std::move(func), std::move(args)...)); 191 } 171 } 192 172 193 template <typename Func, typename... Args, 173 template <typename Func, typename... Args, typename Up = Tp> 194 enable_if_t<std::is_void<Up>::value, void> 174 enable_if_t<std::is_void<Up>::value, void> exec(Func func, Args... args); 195 175 196 template <typename Func, typename... Args, 176 template <typename Func, typename... Args, typename Up = Tp> 197 enable_if_t<!std::is_void<Up>::value, void 177 enable_if_t<!std::is_void<Up>::value, void> exec(Func func, Args... args); 198 178 199 template <typename Func, typename... Args> 179 template <typename Func, typename... Args> 200 void run(Func func, Args... args) 180 void run(Func func, Args... args) 201 { 181 { 202 exec(std::move(func), std::move(args). 182 exec(std::move(func), std::move(args)...); 203 } 183 } 204 184 205 protected: 185 protected: 206 template <typename Up, typename Func, type 186 template <typename Up, typename Func, typename... Args> 207 enable_if_t<std::is_void<Up>::value, void> 187 enable_if_t<std::is_void<Up>::value, void> local_exec(Func func, Args... args); 208 188 209 template <typename Up, typename Func, type 189 template <typename Up, typename Func, typename... Args> 210 enable_if_t<!std::is_void<Up>::value, void 190 enable_if_t<!std::is_void<Up>::value, void> local_exec(Func func, Args... args); 211 191 212 // shorter typedefs 192 // shorter typedefs 213 using itr_t = iterator; 193 using itr_t = iterator; 214 using citr_t = const_iterator; 194 using citr_t = const_iterator; 215 using ritr_t = reverse_iterator; 195 using ritr_t = reverse_iterator; 216 using critr_t = const_reverse_iterator; 196 using critr_t = const_reverse_iterator; 217 197 218 public: 198 public: 219 //---------------------------------------- 199 //------------------------------------------------------------------------// 220 // Get tasks with non-void return types 200 // Get tasks with non-void return types 221 // 201 // 222 future_list_t& get_tasks() { return 202 future_list_t& get_tasks() { return m_future_list; } 223 const future_list_t& get_tasks() const { r 203 const future_list_t& get_tasks() const { return m_future_list; } 224 204 225 //---------------------------------------- 205 //------------------------------------------------------------------------// 226 // iterate over tasks with return type 206 // iterate over tasks with return type 227 // 207 // 228 itr_t begin() { return m_future_list.beg 208 itr_t begin() { return m_future_list.begin(); } 229 itr_t end() { return m_future_list.end() 209 itr_t end() { return m_future_list.end(); } 230 citr_t begin() const { return m_future_li 210 citr_t begin() const { return m_future_list.begin(); } 231 citr_t end() const { return m_future_list 211 citr_t end() const { return m_future_list.end(); } 232 citr_t cbegin() const { return m_future_l 212 citr_t cbegin() const { return m_future_list.begin(); } 233 citr_t cend() const { return m_future_lis 213 citr_t cend() const { return m_future_list.end(); } 234 ritr_t rbegin() { return m_future_list.rb 214 ritr_t rbegin() { return m_future_list.rbegin(); } 235 ritr_t rend() { return m_future_list.rend 215 ritr_t rend() { return m_future_list.rend(); } 236 critr_t rbegin() const { return m_future_l 216 critr_t rbegin() const { return m_future_list.rbegin(); } 237 critr_t rend() const { return m_future_lis 217 critr_t rend() const { return m_future_list.rend(); } 238 218 239 //---------------------------------------- 219 //------------------------------------------------------------------------// 240 // wait to finish 220 // wait to finish 241 template <typename Up = Tp, enable_if_t<!s 221 template <typename Up = Tp, enable_if_t<!std::is_void<Up>::value, int> = 0> 242 inline Up join(Up accum = {}); 222 inline Up join(Up accum = {}); 243 //---------------------------------------- 223 //------------------------------------------------------------------------// 244 // wait to finish 224 // wait to finish 245 template <typename Up = Tp, typename Rp = 225 template <typename Up = Tp, typename Rp = Arg, 246 enable_if_t<std::is_void<Up>::va 226 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int> = 0> 247 inline void join(); 227 inline void join(); 248 //---------------------------------------- 228 //------------------------------------------------------------------------// 249 // wait to finish 229 // wait to finish 250 template <typename Up = Tp, typename Rp = 230 template <typename Up = Tp, typename Rp = Arg, 251 enable_if_t<std::is_void<Up>::va 231 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int> = 0> 252 inline void join(); 232 inline void join(); 253 //---------------------------------------- 233 //------------------------------------------------------------------------// 254 // clear the task result history 234 // clear the task result history 255 void clear(); 235 void clear(); 256 236 257 protected: 237 protected: 258 //---------------------------------------- 238 //------------------------------------------------------------------------// 259 // get the thread id 239 // get the thread id 260 static tid_type this_tid() { return std::t 240 static tid_type this_tid() { return std::this_thread::get_id(); } 261 241 262 //---------------------------------------- 242 //------------------------------------------------------------------------// 263 // get the task count 243 // get the task count 264 atomic_int& task_count() { return m_ 244 atomic_int& task_count() { return m_tot_task_count; } 265 const atomic_int& task_count() const { ret 245 const atomic_int& task_count() const { return m_tot_task_count; } 266 246 267 protected: 247 protected: 268 static int f_verbose; 248 static int f_verbose; 269 // Private variables 249 // Private variables 270 uintmax_t m_id = internal::t 250 uintmax_t m_id = internal::task_group_counter()++; 271 intmax_t m_depth = internal::g 251 intmax_t m_depth = internal::get_task_depth(); 272 tid_type m_main_tid = std::this_t 252 tid_type m_main_tid = std::this_thread::get_id(); 273 atomic_int m_tot_task_count{ 0 }; 253 atomic_int m_tot_task_count{ 0 }; 274 lock_t m_task_lock = {}; 254 lock_t m_task_lock = {}; 275 condition_t m_task_cond = {}; 255 condition_t m_task_cond = {}; 276 join_type m_join{}; 256 join_type m_join{}; 277 ThreadPool* m_pool = inter 257 ThreadPool* m_pool = internal::get_default_threadpool(); 278 tbb_task_group_t* m_tbb_task_group = nullp 258 tbb_task_group_t* m_tbb_task_group = nullptr; 279 task_list_t m_task_list = {}; 259 task_list_t m_task_list = {}; 280 future_list_t m_future_list = {}; 260 future_list_t m_future_list = {}; 281 261 282 private: 262 private: 283 void internal_update(); 263 void internal_update(); 284 }; 264 }; 285 265 286 } // namespace PTL 266 } // namespace PTL 287 namespace PTL << 288 { << 289 template <typename Tp, typename Arg, intmax_t << 290 template <typename Func> << 291 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Func&& << 292 : m_join{ std::forward<Func>(_join) } << 293 , m_pool{ _tp } << 294 { << 295 internal_update(); << 296 } << 297 << 298 template <typename Tp, typename Arg, intmax_t << 299 template <typename Up> << 300 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Thread << 301 enable << 302 : m_join{ []() {} } << 303 , m_pool{ _tp } << 304 { << 305 internal_update(); << 306 } << 307 << 308 // Destructor << 309 template <typename Tp, typename Arg, intmax_t << 310 TaskGroup<Tp, Arg, MaxDepth>::~TaskGroup() << 311 { << 312 { << 313 // task will decrement counter and the << 314 // condition variable so acquiring loc << 315 // task group from being destroyed bef << 316 AutoLock _lk{ m_task_lock, std::defer_ << 317 if(!_lk.owns_lock()) << 318 _lk.lock(); << 319 } << 320 << 321 if(m_tbb_task_group) << 322 { << 323 auto* _arena = m_pool->get_task_arena( << 324 _arena->execute([this]() { this->m_tbb << 325 } << 326 delete m_tbb_task_group; << 327 this->clear(); << 328 } << 329 << 330 template <typename Tp, typename Arg, intmax_t << 331 template <typename Up> << 332 std::shared_ptr<Up> << 333 TaskGroup<Tp, Arg, MaxDepth>::operator+=(std:: << 334 { << 335 // thread-safe increment of tasks in task << 336 operator++(); << 337 // copy the shared pointer to abstract ins << 338 m_task_list.push_back(_task); << 339 // return the derived instance << 340 return std::move(_task); << 341 } << 342 << 343 template <typename Tp, typename Arg, intmax_t << 344 void << 345 TaskGroup<Tp, Arg, MaxDepth>::wait() << 346 { << 347 auto _dtor = ScopeDestructor{ [&]() { << 348 if(m_tbb_task_group) << 349 { << 350 auto* _arena = m_pool->get_task_ar << 351 _arena->execute([this]() { this->m << 352 } << 353 } }; << 354 << 355 ThreadData* data = ThreadData::GetInstance << 356 if(!data) << 357 return; << 358 267 359 // if no pool was initially present at cre << 268 #include "PTL/TaskGroup.icc" 360 if(!m_pool) << 361 { << 362 // check for master MT run-manager << 363 m_pool = internal::get_default_threadp << 364 << 365 // if no thread pool created << 366 if(!m_pool) << 367 { << 368 if(f_verbose > 0) << 369 { << 370 fprintf(stderr, "%s @ %i :: Wa << 371 __FUNCTION__, __LINE__ << 372 std::cerr << __FUNCTION__ << " << 373 << "nullptr to threa << 374 } << 375 return; << 376 } << 377 } << 378 << 379 ThreadPool* tpool = (m_pool) ? m_pool << 380 VUserTaskQueue* taskq = (tpool) ? tpool->g << 381 << 382 bool _is_main = data->is_main; << 383 bool _within_task = data->within_task; << 384 << 385 auto is_active_state = [&]() { << 386 return (tpool->state()->load(std::memo << 387 thread_pool::state::STOPPED); << 388 }; << 389 << 390 auto execute_this_threads_tasks = [&]() { << 391 if(!taskq) << 392 return; << 393 << 394 // only want to process if within a ta << 395 if((!_is_main || tpool->size() < 2) && << 396 { << 397 int bin = static_cast<int>(taskq-> << 398 // const auto nitr = (tpool) ? tpo << 399 // Thread::hardware_concurrency(); << 400 while(this->pending() > 0) << 401 { << 402 if(!taskq->empty()) << 403 { << 404 auto _task = taskq->GetTas << 405 if(_task) << 406 (*_task)(); << 407 } << 408 } << 409 } << 410 }; << 411 << 412 // checks for validity << 413 if(!is_native_task_group()) << 414 { << 415 // for external threads << 416 if(!_is_main || tpool->size() < 2) << 417 return; << 418 } << 419 else if(f_verbose > 0) << 420 { << 421 if(!tpool || !taskq) << 422 { << 423 // something is wrong, didn't crea << 424 fprintf(stderr, << 425 "%s @ %i :: Warning! nullp << 426 "(%p)\n", << 427 __FUNCTION__, __LINE__, st << 428 static_cast<void*>(taskq)) << 429 } << 430 // return if thread pool isn't built << 431 else if(is_native_task_group() && !tpo << 432 { << 433 fprintf(stderr, "%s @ %i :: Warnin << 434 __FUNCTION__, __LINE__); << 435 } << 436 else if(!is_active_state()) << 437 { << 438 fprintf(stderr, "%s @ %i :: Warnin << 439 __FUNCTION__, __LINE__); << 440 } << 441 } << 442 << 443 intmax_t wake_size = 2; << 444 AutoLock _lock(m_task_lock, std::defer_loc << 445 << 446 while(is_active_state()) << 447 { << 448 execute_this_threads_tasks(); << 449 << 450 // while loop protects against spuriou << 451 while(_is_main && pending() > 0 && is_ << 452 { << 453 // auto _wake = [&]() { return (wa << 454 // !is_active_state()); << 455 // }; << 456 << 457 // lock before sleeping on conditi << 458 if(!_lock.owns_lock()) << 459 _lock.lock(); << 460 << 461 // Wait until signaled that a task << 462 // Unlock mutex while wait, then l << 463 // when true, this wakes the threa << 464 if(pending() >= wake_size) << 465 { << 466 m_task_cond.wait(_lock); << 467 } << 468 else << 469 { << 470 m_task_cond.wait_for(_lock, st << 471 } << 472 // unlock << 473 if(_lock.owns_lock()) << 474 _lock.unlock(); << 475 } << 476 << 477 // if pending is not greater than zero << 478 if(pending() <= 0) << 479 break; << 480 } << 481 << 482 if(_lock.owns_lock()) << 483 _lock.unlock(); << 484 << 485 intmax_t ntask = this->task_count().load() << 486 if(ntask > 0) << 487 { << 488 std::stringstream ss; << 489 ss << "\nWarning! Join operation issue << 490 << "are running!" << std::endl; << 491 std::cerr << ss.str(); << 492 this->wait(); << 493 } << 494 } << 495 << 496 template <typename Tp, typename Arg, intmax_t << 497 ScopeDestructor << 498 TaskGroup<Tp, Arg, MaxDepth>::get_scope_destru << 499 { << 500 auto& _counter = m_tot_task_count; << 501 auto& _task_cond = task_cond(); << 502 auto& _task_lock = task_lock(); << 503 return ScopeDestructor{ [&_task_cond, &_ta << 504 auto _count = --(_counter); << 505 if(_count < 1) << 506 { << 507 AutoLock _lk{ _task_lock }; << 508 _task_cond.notify_all(); << 509 } << 510 } }; << 511 } << 512 << 513 template <typename Tp, typename Arg, intmax_t << 514 void << 515 TaskGroup<Tp, Arg, MaxDepth>::notify() << 516 { << 517 AutoLock _lk{ m_task_lock }; << 518 m_task_cond.notify_one(); << 519 } << 520 << 521 template <typename Tp, typename Arg, intmax_t << 522 void << 523 TaskGroup<Tp, Arg, MaxDepth>::notify_all() << 524 { << 525 AutoLock _lk{ m_task_lock }; << 526 m_task_cond.notify_all(); << 527 } << 528 << 529 template <typename Tp, typename Arg, intmax_t << 530 template <typename Func, typename... Args, typ << 531 enable_if_t<std::is_void<Up>::value, void> << 532 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, << 533 { << 534 if(MaxDepth > 0 && !m_tbb_task_group && Th << 535 ThreadData::GetInstance()->task_depth > << 536 { << 537 local_exec<Tp>(std::move(func), std::m << 538 } << 539 else << 540 { << 541 auto& _counter = m_tot_task_count; << 542 auto& _task_cond = task_cond(); << 543 auto& _task_lock = task_lock(); << 544 auto _task = wrap([&_task_cond, << 545 auto* _tdata = ThreadData::GetInst << 546 if(_tdata) << 547 ++(_tdata->task_depth); << 548 func(args...); << 549 auto _count = --(_counter); << 550 if(_tdata) << 551 --(_tdata->task_depth); << 552 if(_count < 1) << 553 { << 554 AutoLock _lk{ _task_lock }; << 555 _task_cond.notify_all(); << 556 } << 557 }); << 558 << 559 if(m_tbb_task_group) << 560 { << 561 auto* _arena = m_pool->ge << 562 auto* _tbb_task_group = m_tbb_task << 563 auto* _ptask = _task.get( << 564 _arena->execute([_tbb_task_group, << 565 _tbb_task_group->run([_ptask]( << 566 }); << 567 } << 568 else << 569 { << 570 m_pool->add_task(std::move(_task)) << 571 } << 572 } << 573 } << 574 template <typename Tp, typename Arg, intmax_t << 575 template <typename Func, typename... Args, typ << 576 enable_if_t<!std::is_void<Up>::value, void> << 577 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, << 578 { << 579 if(MaxDepth > 0 && !m_tbb_task_group && Th << 580 ThreadData::GetInstance()->task_depth > << 581 { << 582 local_exec<Tp>(std::move(func), std::m << 583 } << 584 else << 585 { << 586 auto& _counter = m_tot_task_count; << 587 auto& _task_cond = task_cond(); << 588 auto& _task_lock = task_lock(); << 589 auto _task = wrap([&_task_cond, << 590 auto* _tdata = ThreadData::GetInst << 591 if(_tdata) << 592 ++(_tdata->task_depth); << 593 auto&& _ret = func(args...); << 594 auto _count = --(_counter); << 595 if(_tdata) << 596 --(_tdata->task_depth); << 597 if(_count < 1) << 598 { << 599 AutoLock _lk{ _task_lock }; << 600 _task_cond.notify_all(); << 601 } << 602 return std::forward<decltype(_ret) << 603 }); << 604 << 605 if(m_tbb_task_group) << 606 { << 607 auto* _arena = m_pool->ge << 608 auto* _tbb_task_group = m_tbb_task << 609 auto* _ptask = _task.get( << 610 _arena->execute([_tbb_task_group, << 611 _tbb_task_group->run([_ptask]( << 612 }); << 613 } << 614 else << 615 { << 616 m_pool->add_task(std::move(_task)) << 617 } << 618 } << 619 } << 620 << 621 template <typename Tp, typename Arg, intmax_t << 622 template <typename Up, typename Func, typename << 623 enable_if_t<std::is_void<Up>::value, void> << 624 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func << 625 { << 626 auto* _tdata = ThreadData::GetInstance(); << 627 if(_tdata) << 628 ++(_tdata->task_depth); << 629 promise_type _p{}; << 630 m_future_list.emplace_back(_p.get_future() << 631 func(args...); << 632 _p.set_value(); << 633 if(_tdata) << 634 --(_tdata->task_depth); << 635 } << 636 << 637 template <typename Tp, typename Arg, intmax_t << 638 template <typename Up, typename Func, typename << 639 enable_if_t<!std::is_void<Up>::value, void> << 640 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func << 641 { << 642 auto* _tdata = ThreadData::GetInstance(); << 643 if(_tdata) << 644 ++(_tdata->task_depth); << 645 promise_type _p{}; << 646 m_future_list.emplace_back(_p.get_future() << 647 _p.set_value(func(args...)); << 648 if(_tdata) << 649 --(_tdata->task_depth); << 650 } << 651 << 652 template <typename Tp, typename Arg, intmax_t << 653 template <typename Up, enable_if_t<!std::is_vo << 654 inline Up << 655 TaskGroup<Tp, Arg, MaxDepth>::join(Up accum) << 656 { << 657 this->wait(); << 658 for(auto& itr : m_task_list) << 659 { << 660 using RetT = decay_t<decltype(itr->get << 661 accum = std::move(m_join(std::ref << 662 } << 663 for(auto& itr : m_future_list) << 664 { << 665 using RetT = decay_t<decltype(itr.get( << 666 accum = std::move(m_join(std::ref << 667 } << 668 this->clear(); << 669 return accum; << 670 } << 671 << 672 template <typename Tp, typename Arg, intmax_t << 673 template <typename Up, typename Rp, << 674 enable_if_t<std::is_void<Up>::value << 675 inline void << 676 TaskGroup<Tp, Arg, MaxDepth>::join() << 677 { << 678 this->wait(); << 679 for(auto& itr : m_task_list) << 680 itr->get(); << 681 for(auto& itr : m_future_list) << 682 itr.get(); << 683 m_join(); << 684 this->clear(); << 685 } << 686 << 687 template <typename Tp, typename Arg, intmax_t << 688 template <typename Up, typename Rp, << 689 enable_if_t<std::is_void<Up>::value << 690 inline void << 691 TaskGroup<Tp, Arg, MaxDepth>::join() << 692 { << 693 this->wait(); << 694 for(auto& itr : m_task_list) << 695 { << 696 using RetT = decay_t<decltype(itr->get << 697 m_join(std::forward<RetT>(itr->get())) << 698 } << 699 for(auto& itr : m_future_list) << 700 { << 701 using RetT = decay_t<decltype(itr.get( << 702 m_join(std::forward<RetT>(itr.get())); << 703 } << 704 this->clear(); << 705 } << 706 << 707 template <typename Tp, typename Arg, intmax_t << 708 void << 709 TaskGroup<Tp, Arg, MaxDepth>::clear() << 710 { << 711 m_future_list.clear(); << 712 m_task_list.clear(); << 713 } << 714 << 715 template <typename Tp, typename Arg, intmax_t << 716 void << 717 TaskGroup<Tp, Arg, MaxDepth>::internal_update( << 718 { << 719 if(!m_pool) << 720 m_pool = internal::get_default_threadp << 721 << 722 if(!m_pool) << 723 { << 724 std::stringstream ss{}; << 725 ss << "[TaskGroup]> " << __FUNCTION__ << 726 << " :: nullptr to thread pool"; << 727 throw std::runtime_error(ss.str()); << 728 } << 729 << 730 if(m_pool->is_tbb_threadpool()) << 731 { << 732 m_tbb_task_group = new tbb_task_group_ << 733 } << 734 } << 735 << 736 template <typename Tp, typename Arg, intmax_t << 737 int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = << 738 << 739 } // namespace PTL << 740 269