Geant4 Cross Reference |
1 // 1 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 5 // of this software and associated documentati 6 // in the Software without restriction, includ 7 // to use, copy, modify, merge, publish, distr 8 // copies of the Software, and to permit perso 9 // furnished to do so, subject to the followin 10 // The above copyright notice and this permiss 11 // all copies or substantial portions of the S 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 18 // 19 // 20 // ------------------------------------------- 21 // Tasking class header file 22 // 23 // Class Description: 24 // 25 // This file creates the a class for handling 26 // can be independently joined 27 // 28 // ------------------------------------------- 29 // Author: Jonathan Madsen (Feb 13th 2018) 30 // ------------------------------------------- 31 32 #pragma once 33 34 #include "PTL/AutoLock.hh" 35 #ifndef G4GMAKE 36 #include "" 37 #endif 38 #include "PTL/JoinFunction.hh" 39 #include "PTL/ScopeDestructor.hh" 40 #include "PTL/Task.hh" 41 #include "PTL/ThreadData.hh" 42 #include "PTL/ThreadPool.hh" 43 #include "PTL/Types.hh" 44 #include "PTL/VTask.hh" 45 #include "PTL/VUserTaskQueue.hh" 46 #include "PTL/detail/CxxBackports.hh" 47 48 #include <atomic> 49 #include <chrono> 50 #include <cstdint> 51 #include <cstdio> 52 #include <functional> 53 #include <future> 54 #include <iostream> 55 #include <memory> 56 #include <mutex> 57 #include <sstream> // IWYU pragma: keep 58 #include <stdexcept> 59 #include <thread> 60 #include <type_traits> 61 #include <utility> 62 #include <vector> 63 64 #if defined(PTL_USE_TBB) 65 # include <tbb/task_group.h> // IWYU pragm 66 #endif 67 68 namespace PTL 69 { 70 namespace internal 71 { 72 std::atomic_uintmax_t& 73 task_group_counter(); 74 75 ThreadPool* 76 get_default_threadpool(); 77 78 intmax_t 79 get_task_depth(); 80 } // namespace internal 81 82 template <typename Tp, typename Arg = Tp, intm 83 class TaskGroup 84 { 85 public: 86 //---------------------------------------- 87 template <typename Up> 88 using container_type = std::vector<Up>; 89 90 using tid_type = std::thread 91 using size_type = uintmax_t; 92 using lock_t = Mutex; 93 using atomic_int = std::atomic 94 using atomic_uint = std::atomic 95 using condition_t = Condition; 96 using ArgTp = decay_t<Arg 97 using result_type = Tp; 98 using task_pointer = std::shared 99 using task_list_t = container_t 100 using this_type = TaskGroup<T 101 using promise_type = std::promis 102 using future_type = std::future 103 using packaged_task_type = std::packag 104 using future_list_t = container_t 105 using join_type = typename Jo 106 using iterator = typename fu 107 using reverse_iterator = typename fu 108 using const_iterator = typename fu 109 using const_reverse_iterator = typename fu 110 //---------------------------------------- 111 template <typename... Args> 112 using task_type = Task<ArgTp, decay_t<Args 113 //---------------------------------------- 114 115 public: 116 // Constructor 117 template <typename Func> 118 TaskGroup(Func&& _join, ThreadPool* _tp = 119 120 template <typename Up = Tp> 121 TaskGroup(ThreadPool* _tp = internal::get_ 122 enable_if_t<std::is_void<Up>::va 123 124 // Destructor 125 ~TaskGroup(); 126 127 // delete copy-construct 128 TaskGroup(const this_type&) = delete; 129 // define move-construct 130 // NOLINTNEXTLINE(performance-noexcept-mov 131 TaskGroup(this_type&& rhs) = default; 132 // delete copy-assign 133 TaskGroup& operator=(const this_type& rhs) 134 // define move-assign 135 // NOLINTNEXTLINE(performance-noexcept-mov 136 TaskGroup& operator=(this_type&& rhs) = de 137 138 public: 139 template <typename Up> 140 std::shared_ptr<Up> operator+=(std::shared 141 142 // wait to finish 143 void wait(); 144 145 // increment (prefix) 146 intmax_t operator++() { return ++(m_tot_ta 147 intmax_t operator++(int) { return (m_tot_t 148 intmax_t operator--() { return --(m_tot_ta 149 intmax_t operator--(int) { return (m_tot_t 150 151 // size 152 intmax_t size() const { return m_tot_task_ 153 154 // get the locks/conditions 155 lock_t& task_lock() { return m_task_l 156 condition_t& task_cond() { return m_task_c 157 158 // identifier 159 uintmax_t id() const { return m_id; } 160 161 // thread pool 162 void set_pool(ThreadPool* tp) { m_ 163 ThreadPool*& pool() { return m_pool; } 164 ThreadPool* pool() const { return m_pool; 165 166 bool is_native_task_group() const { return 167 bool is_main() const { return this_tid() = 168 169 // check if any tasks are still pending 170 intmax_t pending() { return m_tot_task_cou 171 172 static void set_verbose(int level) { f_ver 173 174 ScopeDestructor get_scope_destructor(); 175 176 void notify(); 177 void notify_all(); 178 179 void reserve(size_t _n) 180 { 181 m_task_list.reserve(_n); 182 m_future_list.reserve(_n); 183 } 184 185 public: 186 template <typename Func, typename... Args> 187 std::shared_ptr<task_type<Args...>> wrap(F 188 { 189 return operator+=(std::make_shared<tas 190 is_native_task_group(), m_depth, s 191 } 192 193 template <typename Func, typename... Args, 194 enable_if_t<std::is_void<Up>::value, void> 195 196 template <typename Func, typename... Args, 197 enable_if_t<!std::is_void<Up>::value, void 198 199 template <typename Func, typename... Args> 200 void run(Func func, Args... args) 201 { 202 exec(std::move(func), std::move(args). 203 } 204 205 protected: 206 template <typename Up, typename Func, type 207 enable_if_t<std::is_void<Up>::value, void> 208 209 template <typename Up, typename Func, type 210 enable_if_t<!std::is_void<Up>::value, void 211 212 // shorter typedefs 213 using itr_t = iterator; 214 using citr_t = const_iterator; 215 using ritr_t = reverse_iterator; 216 using critr_t = const_reverse_iterator; 217 218 public: 219 //---------------------------------------- 220 // Get tasks with non-void return types 221 // 222 future_list_t& get_tasks() { return 223 const future_list_t& get_tasks() const { r 224 225 //---------------------------------------- 226 // iterate over tasks with return type 227 // 228 itr_t begin() { return m_future_list.beg 229 itr_t end() { return m_future_list.end() 230 citr_t begin() const { return m_future_li 231 citr_t end() const { return m_future_list 232 citr_t cbegin() const { return m_future_l 233 citr_t cend() const { return m_future_lis 234 ritr_t rbegin() { return m_future_list.rb 235 ritr_t rend() { return m_future_list.rend 236 critr_t rbegin() const { return m_future_l 237 critr_t rend() const { return m_future_lis 238 239 //---------------------------------------- 240 // wait to finish 241 template <typename Up = Tp, enable_if_t<!s 242 inline Up join(Up accum = {}); 243 //---------------------------------------- 244 // wait to finish 245 template <typename Up = Tp, typename Rp = 246 enable_if_t<std::is_void<Up>::va 247 inline void join(); 248 //---------------------------------------- 249 // wait to finish 250 template <typename Up = Tp, typename Rp = 251 enable_if_t<std::is_void<Up>::va 252 inline void join(); 253 //---------------------------------------- 254 // clear the task result history 255 void clear(); 256 257 protected: 258 //---------------------------------------- 259 // get the thread id 260 static tid_type this_tid() { return std::t 261 262 //---------------------------------------- 263 // get the task count 264 atomic_int& task_count() { return m_ 265 const atomic_int& task_count() const { ret 266 267 protected: 268 static int f_verbose; 269 // Private variables 270 uintmax_t m_id = internal::t 271 intmax_t m_depth = internal::g 272 tid_type m_main_tid = std::this_t 273 atomic_int m_tot_task_count{ 0 }; 274 lock_t m_task_lock = {}; 275 condition_t m_task_cond = {}; 276 join_type m_join{}; 277 ThreadPool* m_pool = inter 278 tbb_task_group_t* m_tbb_task_group = nullp 279 task_list_t m_task_list = {}; 280 future_list_t m_future_list = {}; 281 282 private: 283 void internal_update(); 284 }; 285 286 } // namespace PTL 287 namespace PTL 288 { 289 template <typename Tp, typename Arg, intmax_t 290 template <typename Func> 291 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Func&& 292 : m_join{ std::forward<Func>(_join) } 293 , m_pool{ _tp } 294 { 295 internal_update(); 296 } 297 298 template <typename Tp, typename Arg, intmax_t 299 template <typename Up> 300 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Thread 301 enable 302 : m_join{ []() {} } 303 , m_pool{ _tp } 304 { 305 internal_update(); 306 } 307 308 // Destructor 309 template <typename Tp, typename Arg, intmax_t 310 TaskGroup<Tp, Arg, MaxDepth>::~TaskGroup() 311 { 312 { 313 // task will decrement counter and the 314 // condition variable so acquiring loc 315 // task group from being destroyed bef 316 AutoLock _lk{ m_task_lock, std::defer_ 317 if(!_lk.owns_lock()) 318 _lk.lock(); 319 } 320 321 if(m_tbb_task_group) 322 { 323 auto* _arena = m_pool->get_task_arena( 324 _arena->execute([this]() { this->m_tbb 325 } 326 delete m_tbb_task_group; 327 this->clear(); 328 } 329 330 template <typename Tp, typename Arg, intmax_t 331 template <typename Up> 332 std::shared_ptr<Up> 333 TaskGroup<Tp, Arg, MaxDepth>::operator+=(std:: 334 { 335 // thread-safe increment of tasks in task 336 operator++(); 337 // copy the shared pointer to abstract ins 338 m_task_list.push_back(_task); 339 // return the derived instance 340 return std::move(_task); 341 } 342 343 template <typename Tp, typename Arg, intmax_t 344 void 345 TaskGroup<Tp, Arg, MaxDepth>::wait() 346 { 347 auto _dtor = ScopeDestructor{ [&]() { 348 if(m_tbb_task_group) 349 { 350 auto* _arena = m_pool->get_task_ar 351 _arena->execute([this]() { this->m 352 } 353 } }; 354 355 ThreadData* data = ThreadData::GetInstance 356 if(!data) 357 return; 358 359 // if no pool was initially present at cre 360 if(!m_pool) 361 { 362 // check for master MT run-manager 363 m_pool = internal::get_default_threadp 364 365 // if no thread pool created 366 if(!m_pool) 367 { 368 if(f_verbose > 0) 369 { 370 fprintf(stderr, "%s @ %i :: Wa 371 __FUNCTION__, __LINE__ 372 std::cerr << __FUNCTION__ << " 373 << "nullptr to threa 374 } 375 return; 376 } 377 } 378 379 ThreadPool* tpool = (m_pool) ? m_pool 380 VUserTaskQueue* taskq = (tpool) ? tpool->g 381 382 bool _is_main = data->is_main; 383 bool _within_task = data->within_task; 384 385 auto is_active_state = [&]() { 386 return (tpool->state()->load(std::memo 387 thread_pool::state::STOPPED); 388 }; 389 390 auto execute_this_threads_tasks = [&]() { 391 if(!taskq) 392 return; 393 394 // only want to process if within a ta 395 if((!_is_main || tpool->size() < 2) && 396 { 397 int bin = static_cast<int>(taskq-> 398 // const auto nitr = (tpool) ? tpo 399 // Thread::hardware_concurrency(); 400 while(this->pending() > 0) 401 { 402 if(!taskq->empty()) 403 { 404 auto _task = taskq->GetTas 405 if(_task) 406 (*_task)(); 407 } 408 } 409 } 410 }; 411 412 // checks for validity 413 if(!is_native_task_group()) 414 { 415 // for external threads 416 if(!_is_main || tpool->size() < 2) 417 return; 418 } 419 else if(f_verbose > 0) 420 { 421 if(!tpool || !taskq) 422 { 423 // something is wrong, didn't crea 424 fprintf(stderr, 425 "%s @ %i :: Warning! nullp 426 "(%p)\n", 427 __FUNCTION__, __LINE__, st 428 static_cast<void*>(taskq)) 429 } 430 // return if thread pool isn't built 431 else if(is_native_task_group() && !tpo 432 { 433 fprintf(stderr, "%s @ %i :: Warnin 434 __FUNCTION__, __LINE__); 435 } 436 else if(!is_active_state()) 437 { 438 fprintf(stderr, "%s @ %i :: Warnin 439 __FUNCTION__, __LINE__); 440 } 441 } 442 443 intmax_t wake_size = 2; 444 AutoLock _lock(m_task_lock, std::defer_loc 445 446 while(is_active_state()) 447 { 448 execute_this_threads_tasks(); 449 450 // while loop protects against spuriou 451 while(_is_main && pending() > 0 && is_ 452 { 453 // auto _wake = [&]() { return (wa 454 // !is_active_state()); 455 // }; 456 457 // lock before sleeping on conditi 458 if(!_lock.owns_lock()) 459 _lock.lock(); 460 461 // Wait until signaled that a task 462 // Unlock mutex while wait, then l 463 // when true, this wakes the threa 464 if(pending() >= wake_size) 465 { 466 m_task_cond.wait(_lock); 467 } 468 else 469 { 470 m_task_cond.wait_for(_lock, st 471 } 472 // unlock 473 if(_lock.owns_lock()) 474 _lock.unlock(); 475 } 476 477 // if pending is not greater than zero 478 if(pending() <= 0) 479 break; 480 } 481 482 if(_lock.owns_lock()) 483 _lock.unlock(); 484 485 intmax_t ntask = this->task_count().load() 486 if(ntask > 0) 487 { 488 std::stringstream ss; 489 ss << "\nWarning! Join operation issue 490 << "are running!" << std::endl; 491 std::cerr << ss.str(); 492 this->wait(); 493 } 494 } 495 496 template <typename Tp, typename Arg, intmax_t 497 ScopeDestructor 498 TaskGroup<Tp, Arg, MaxDepth>::get_scope_destru 499 { 500 auto& _counter = m_tot_task_count; 501 auto& _task_cond = task_cond(); 502 auto& _task_lock = task_lock(); 503 return ScopeDestructor{ [&_task_cond, &_ta 504 auto _count = --(_counter); 505 if(_count < 1) 506 { 507 AutoLock _lk{ _task_lock }; 508 _task_cond.notify_all(); 509 } 510 } }; 511 } 512 513 template <typename Tp, typename Arg, intmax_t 514 void 515 TaskGroup<Tp, Arg, MaxDepth>::notify() 516 { 517 AutoLock _lk{ m_task_lock }; 518 m_task_cond.notify_one(); 519 } 520 521 template <typename Tp, typename Arg, intmax_t 522 void 523 TaskGroup<Tp, Arg, MaxDepth>::notify_all() 524 { 525 AutoLock _lk{ m_task_lock }; 526 m_task_cond.notify_all(); 527 } 528 529 template <typename Tp, typename Arg, intmax_t 530 template <typename Func, typename... Args, typ 531 enable_if_t<std::is_void<Up>::value, void> 532 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, 533 { 534 if(MaxDepth > 0 && !m_tbb_task_group && Th 535 ThreadData::GetInstance()->task_depth > 536 { 537 local_exec<Tp>(std::move(func), std::m 538 } 539 else 540 { 541 auto& _counter = m_tot_task_count; 542 auto& _task_cond = task_cond(); 543 auto& _task_lock = task_lock(); 544 auto _task = wrap([&_task_cond, 545 auto* _tdata = ThreadData::GetInst 546 if(_tdata) 547 ++(_tdata->task_depth); 548 func(args...); 549 auto _count = --(_counter); 550 if(_tdata) 551 --(_tdata->task_depth); 552 if(_count < 1) 553 { 554 AutoLock _lk{ _task_lock }; 555 _task_cond.notify_all(); 556 } 557 }); 558 559 if(m_tbb_task_group) 560 { 561 auto* _arena = m_pool->ge 562 auto* _tbb_task_group = m_tbb_task 563 auto* _ptask = _task.get( 564 _arena->execute([_tbb_task_group, 565 _tbb_task_group->run([_ptask]( 566 }); 567 } 568 else 569 { 570 m_pool->add_task(std::move(_task)) 571 } 572 } 573 } 574 template <typename Tp, typename Arg, intmax_t 575 template <typename Func, typename... Args, typ 576 enable_if_t<!std::is_void<Up>::value, void> 577 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, 578 { 579 if(MaxDepth > 0 && !m_tbb_task_group && Th 580 ThreadData::GetInstance()->task_depth > 581 { 582 local_exec<Tp>(std::move(func), std::m 583 } 584 else 585 { 586 auto& _counter = m_tot_task_count; 587 auto& _task_cond = task_cond(); 588 auto& _task_lock = task_lock(); 589 auto _task = wrap([&_task_cond, 590 auto* _tdata = ThreadData::GetInst 591 if(_tdata) 592 ++(_tdata->task_depth); 593 auto&& _ret = func(args...); 594 auto _count = --(_counter); 595 if(_tdata) 596 --(_tdata->task_depth); 597 if(_count < 1) 598 { 599 AutoLock _lk{ _task_lock }; 600 _task_cond.notify_all(); 601 } 602 return std::forward<decltype(_ret) 603 }); 604 605 if(m_tbb_task_group) 606 { 607 auto* _arena = m_pool->ge 608 auto* _tbb_task_group = m_tbb_task 609 auto* _ptask = _task.get( 610 _arena->execute([_tbb_task_group, 611 _tbb_task_group->run([_ptask]( 612 }); 613 } 614 else 615 { 616 m_pool->add_task(std::move(_task)) 617 } 618 } 619 } 620 621 template <typename Tp, typename Arg, intmax_t 622 template <typename Up, typename Func, typename 623 enable_if_t<std::is_void<Up>::value, void> 624 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func 625 { 626 auto* _tdata = ThreadData::GetInstance(); 627 if(_tdata) 628 ++(_tdata->task_depth); 629 promise_type _p{}; 630 m_future_list.emplace_back(_p.get_future() 631 func(args...); 632 _p.set_value(); 633 if(_tdata) 634 --(_tdata->task_depth); 635 } 636 637 template <typename Tp, typename Arg, intmax_t 638 template <typename Up, typename Func, typename 639 enable_if_t<!std::is_void<Up>::value, void> 640 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func 641 { 642 auto* _tdata = ThreadData::GetInstance(); 643 if(_tdata) 644 ++(_tdata->task_depth); 645 promise_type _p{}; 646 m_future_list.emplace_back(_p.get_future() 647 _p.set_value(func(args...)); 648 if(_tdata) 649 --(_tdata->task_depth); 650 } 651 652 template <typename Tp, typename Arg, intmax_t 653 template <typename Up, enable_if_t<!std::is_vo 654 inline Up 655 TaskGroup<Tp, Arg, MaxDepth>::join(Up accum) 656 { 657 this->wait(); 658 for(auto& itr : m_task_list) 659 { 660 using RetT = decay_t<decltype(itr->get 661 accum = std::move(m_join(std::ref 662 } 663 for(auto& itr : m_future_list) 664 { 665 using RetT = decay_t<decltype(itr.get( 666 accum = std::move(m_join(std::ref 667 } 668 this->clear(); 669 return accum; 670 } 671 672 template <typename Tp, typename Arg, intmax_t 673 template <typename Up, typename Rp, 674 enable_if_t<std::is_void<Up>::value 675 inline void 676 TaskGroup<Tp, Arg, MaxDepth>::join() 677 { 678 this->wait(); 679 for(auto& itr : m_task_list) 680 itr->get(); 681 for(auto& itr : m_future_list) 682 itr.get(); 683 m_join(); 684 this->clear(); 685 } 686 687 template <typename Tp, typename Arg, intmax_t 688 template <typename Up, typename Rp, 689 enable_if_t<std::is_void<Up>::value 690 inline void 691 TaskGroup<Tp, Arg, MaxDepth>::join() 692 { 693 this->wait(); 694 for(auto& itr : m_task_list) 695 { 696 using RetT = decay_t<decltype(itr->get 697 m_join(std::forward<RetT>(itr->get())) 698 } 699 for(auto& itr : m_future_list) 700 { 701 using RetT = decay_t<decltype(itr.get( 702 m_join(std::forward<RetT>(itr.get())); 703 } 704 this->clear(); 705 } 706 707 template <typename Tp, typename Arg, intmax_t 708 void 709 TaskGroup<Tp, Arg, MaxDepth>::clear() 710 { 711 m_future_list.clear(); 712 m_task_list.clear(); 713 } 714 715 template <typename Tp, typename Arg, intmax_t 716 void 717 TaskGroup<Tp, Arg, MaxDepth>::internal_update( 718 { 719 if(!m_pool) 720 m_pool = internal::get_default_threadp 721 722 if(!m_pool) 723 { 724 std::stringstream ss{}; 725 ss << "[TaskGroup]> " << __FUNCTION__ 726 << " :: nullptr to thread pool"; 727 throw std::runtime_error(ss.str()); 728 } 729 730 if(m_pool->is_tbb_threadpool()) 731 { 732 m_tbb_task_group = new tbb_task_group_ 733 } 734 } 735 736 template <typename Tp, typename Arg, intmax_t 737 int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = 738 739 } // namespace PTL 740