Geant4 Cross Reference

Cross-Referencing   Geant4
Geant4/externals/ptl/include/PTL/TaskGroup.hh

Version: [ ReleaseNotes ] [ 1.0 ] [ 1.1 ] [ 2.0 ] [ 3.0 ] [ 3.1 ] [ 3.2 ] [ 4.0 ] [ 4.0.p1 ] [ 4.0.p2 ] [ 4.1 ] [ 4.1.p1 ] [ 5.0 ] [ 5.0.p1 ] [ 5.1 ] [ 5.1.p1 ] [ 5.2 ] [ 5.2.p1 ] [ 5.2.p2 ] [ 6.0 ] [ 6.0.p1 ] [ 6.1 ] [ 6.2 ] [ 6.2.p1 ] [ 6.2.p2 ] [ 7.0 ] [ 7.0.p1 ] [ 7.1 ] [ 7.1.p1 ] [ 8.0 ] [ 8.0.p1 ] [ 8.1 ] [ 8.1.p1 ] [ 8.1.p2 ] [ 8.2 ] [ 8.2.p1 ] [ 8.3 ] [ 8.3.p1 ] [ 8.3.p2 ] [ 9.0 ] [ 9.0.p1 ] [ 9.0.p2 ] [ 9.1 ] [ 9.1.p1 ] [ 9.1.p2 ] [ 9.1.p3 ] [ 9.2 ] [ 9.2.p1 ] [ 9.2.p2 ] [ 9.2.p3 ] [ 9.2.p4 ] [ 9.3 ] [ 9.3.p1 ] [ 9.3.p2 ] [ 9.4 ] [ 9.4.p1 ] [ 9.4.p2 ] [ 9.4.p3 ] [ 9.4.p4 ] [ 9.5 ] [ 9.5.p1 ] [ 9.5.p2 ] [ 9.6 ] [ 9.6.p1 ] [ 9.6.p2 ] [ 9.6.p3 ] [ 9.6.p4 ] [ 10.0 ] [ 10.0.p1 ] [ 10.0.p2 ] [ 10.0.p3 ] [ 10.0.p4 ] [ 10.1 ] [ 10.1.p1 ] [ 10.1.p2 ] [ 10.1.p3 ] [ 10.2 ] [ 10.2.p1 ] [ 10.2.p2 ] [ 10.2.p3 ] [ 10.3 ] [ 10.3.p1 ] [ 10.3.p2 ] [ 10.3.p3 ] [ 10.4 ] [ 10.4.p1 ] [ 10.4.p2 ] [ 10.4.p3 ] [ 10.5 ] [ 10.5.p1 ] [ 10.6 ] [ 10.6.p1 ] [ 10.6.p2 ] [ 10.6.p3 ] [ 10.7 ] [ 10.7.p1 ] [ 10.7.p2 ] [ 10.7.p3 ] [ 10.7.p4 ] [ 11.0 ] [ 11.0.p1 ] [ 11.0.p2 ] [ 11.0.p3, ] [ 11.0.p4 ] [ 11.1 ] [ 11.1.1 ] [ 11.1.2 ] [ 11.1.3 ] [ 11.2 ] [ 11.2.1 ] [ 11.2.2 ] [ 11.3.0 ]

  1 //
  2 // MIT License
  3 // Copyright (c) 2020 Jonathan R. Madsen
  4 // Permission is hereby granted, free of charge, to any person obtaining a copy
  5 // of this software and associated documentation files (the "Software"), to deal
  6 // in the Software without restriction, including without limitation the rights
  7 // to use, copy, modify, merge, publish, distribute, sublicense, and
  8 // copies of the Software, and to permit persons to whom the Software is
  9 // furnished to do so, subject to the following conditions:
 10 // The above copyright notice and this permission notice shall be included in
 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 18 //
 19 //
 20 // ---------------------------------------------------------------
 21 // Tasking class header file
 22 //
 23 // Class Description:
 24 //
 25 // This file creates the a class for handling a group of tasks that
 26 // can be independently joined
 27 //
 28 // ---------------------------------------------------------------
 29 // Author: Jonathan Madsen (Feb 13th 2018)
 30 // ---------------------------------------------------------------
 31 
 32 #pragma once
 33 
 34 #include "PTL/AutoLock.hh"
 35 #ifndef G4GMAKE
 36 #include ""
 37 #endif
 38 #include "PTL/JoinFunction.hh"
 39 #include "PTL/ScopeDestructor.hh"
 40 #include "PTL/Task.hh"
 41 #include "PTL/ThreadData.hh"
 42 #include "PTL/ThreadPool.hh"
 43 #include "PTL/Types.hh"
 44 #include "PTL/VTask.hh"
 45 #include "PTL/VUserTaskQueue.hh"
 46 #include "PTL/detail/CxxBackports.hh"
 47 
 48 #include <atomic>
 49 #include <chrono>
 50 #include <cstdint>
 51 #include <cstdio>
 52 #include <functional>
 53 #include <future>
 54 #include <iostream>
 55 #include <memory>
 56 #include <mutex>
 57 #include <sstream>  // IWYU pragma: keep
 58 #include <stdexcept>
 59 #include <thread>
 60 #include <type_traits>
 61 #include <utility>
 62 #include <vector>
 63 
 64 #if defined(PTL_USE_TBB)
 65 #    include <tbb/task_group.h>  // IWYU pragma: keep
 66 #endif
 67 
 68 namespace PTL
 69 {
 70 namespace internal
 71 {
 72 std::atomic_uintmax_t&
 73 task_group_counter();
 74 
 75 ThreadPool*
 76 get_default_threadpool();
 77 
 78 intmax_t
 79 get_task_depth();
 80 }  // namespace internal
 81 
 82 template <typename Tp, typename Arg = Tp, intmax_t MaxDepth = 0>
 83 class TaskGroup
 84 {
 85 public:
 86     //------------------------------------------------------------------------//
 87     template <typename Up>
 88     using container_type = std::vector<Up>;
 89 
 90     using tid_type               = std::thread::id;
 91     using size_type              = uintmax_t;
 92     using lock_t                 = Mutex;
 93     using atomic_int             = std::atomic_intmax_t;
 94     using atomic_uint            = std::atomic_uintmax_t;
 95     using condition_t            = Condition;
 96     using ArgTp                  = decay_t<Arg>;
 97     using result_type            = Tp;
 98     using task_pointer           = std::shared_ptr<TaskFuture<ArgTp>>;
 99     using task_list_t            = container_type<task_pointer>;
100     using this_type              = TaskGroup<Tp, Arg, MaxDepth>;
101     using promise_type           = std::promise<ArgTp>;
102     using future_type            = std::future<ArgTp>;
103     using packaged_task_type     = std::packaged_task<ArgTp()>;
104     using future_list_t          = container_type<future_type>;
105     using join_type              = typename JoinFunction<Tp, Arg>::Type;
106     using iterator               = typename future_list_t::iterator;
107     using reverse_iterator       = typename future_list_t::reverse_iterator;
108     using const_iterator         = typename future_list_t::const_iterator;
109     using const_reverse_iterator = typename future_list_t::const_reverse_iterator;
110     //------------------------------------------------------------------------//
111     template <typename... Args>
112     using task_type = Task<ArgTp, decay_t<Args>...>;
113     //------------------------------------------------------------------------//
114 
115 public:
116     // Constructor
117     template <typename Func>
118     TaskGroup(Func&& _join, ThreadPool* _tp = internal::get_default_threadpool());
119 
120     template <typename Up = Tp>
121     TaskGroup(ThreadPool* _tp = internal::get_default_threadpool(),
122               enable_if_t<std::is_void<Up>::value, int> = 0);
123 
124     // Destructor
125     ~TaskGroup();
126 
127     // delete copy-construct
128     TaskGroup(const this_type&) = delete;
129     // define move-construct
130     // NOLINTNEXTLINE(performance-noexcept-move-constructor)
131     TaskGroup(this_type&& rhs) = default;
132     // delete copy-assign
133     TaskGroup& operator=(const this_type& rhs) = delete;
134     // define move-assign
135     // NOLINTNEXTLINE(performance-noexcept-move-constructor)
136     TaskGroup& operator=(this_type&& rhs) = default;
137 
138 public:
139     template <typename Up>
140     std::shared_ptr<Up> operator+=(std::shared_ptr<Up>&& _task);
141 
142     // wait to finish
143     void wait();
144 
145     // increment (prefix)
146     intmax_t operator++() { return ++(m_tot_task_count); }
147     intmax_t operator++(int) { return (m_tot_task_count)++; }
148     intmax_t operator--() { return --(m_tot_task_count); }
149     intmax_t operator--(int) { return (m_tot_task_count)--; }
150 
151     // size
152     intmax_t size() const { return m_tot_task_count.load(); }
153 
154     // get the locks/conditions
155     lock_t&      task_lock() { return m_task_lock; }
156     condition_t& task_cond() { return m_task_cond; }
157 
158     // identifier
159     uintmax_t id() const { return m_id; }
160 
161     // thread pool
162     void         set_pool(ThreadPool* tp) { m_pool = tp; }
163     ThreadPool*& pool() { return m_pool; }
164     ThreadPool*  pool() const { return m_pool; }
165 
166     bool is_native_task_group() const { return (m_tbb_task_group) == nullptr; }
167     bool is_main() const { return this_tid() == m_main_tid; }
168 
169     // check if any tasks are still pending
170     intmax_t pending() { return m_tot_task_count.load(); }
171 
172     static void set_verbose(int level) { f_verbose = level; }
173 
174     ScopeDestructor get_scope_destructor();
175 
176     void notify();
177     void notify_all();
178 
179     void reserve(size_t _n)
180     {
181         m_task_list.reserve(_n);
182         m_future_list.reserve(_n);
183     }
184 
185 public:
186     template <typename Func, typename... Args>
187     std::shared_ptr<task_type<Args...>> wrap(Func func, Args... args)
188     {
189         return operator+=(std::make_shared<task_type<Args...>>(
190             is_native_task_group(), m_depth, std::move(func), std::move(args)...));
191     }
192 
193     template <typename Func, typename... Args, typename Up = Tp>
194     enable_if_t<std::is_void<Up>::value, void> exec(Func func, Args... args);
195 
196     template <typename Func, typename... Args, typename Up = Tp>
197     enable_if_t<!std::is_void<Up>::value, void> exec(Func func, Args... args);
198 
199     template <typename Func, typename... Args>
200     void run(Func func, Args... args)
201     {
202         exec(std::move(func), std::move(args)...);
203     }
204 
205 protected:
206     template <typename Up, typename Func, typename... Args>
207     enable_if_t<std::is_void<Up>::value, void> local_exec(Func func, Args... args);
208 
209     template <typename Up, typename Func, typename... Args>
210     enable_if_t<!std::is_void<Up>::value, void> local_exec(Func func, Args... args);
211 
212     // shorter typedefs
213     using itr_t   = iterator;
214     using citr_t  = const_iterator;
215     using ritr_t  = reverse_iterator;
216     using critr_t = const_reverse_iterator;
217 
218 public:
219     //------------------------------------------------------------------------//
220     // Get tasks with non-void return types
221     //
222     future_list_t&       get_tasks() { return m_future_list; }
223     const future_list_t& get_tasks() const { return m_future_list; }
224 
225     //------------------------------------------------------------------------//
226     // iterate over tasks with return type
227     //
228     itr_t   begin() { return m_future_list.begin(); }
229     itr_t   end() { return m_future_list.end(); }
230     citr_t  begin() const { return m_future_list.begin(); }
231     citr_t  end() const { return m_future_list.end(); }
232     citr_t  cbegin() const { return m_future_list.begin(); }
233     citr_t  cend() const { return m_future_list.end(); }
234     ritr_t  rbegin() { return m_future_list.rbegin(); }
235     ritr_t  rend() { return m_future_list.rend(); }
236     critr_t rbegin() const { return m_future_list.rbegin(); }
237     critr_t rend() const { return m_future_list.rend(); }
238 
239     //------------------------------------------------------------------------//
240     // wait to finish
241     template <typename Up = Tp, enable_if_t<!std::is_void<Up>::value, int> = 0>
242     inline Up join(Up accum = {});
243     //------------------------------------------------------------------------//
244     // wait to finish
245     template <typename Up = Tp, typename Rp = Arg,
246               enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int> = 0>
247     inline void join();
248     //------------------------------------------------------------------------//
249     // wait to finish
250     template <typename Up = Tp, typename Rp = Arg,
251               enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int> = 0>
252     inline void join();
253     //------------------------------------------------------------------------//
254     // clear the task result history
255     void clear();
256 
257 protected:
258     //------------------------------------------------------------------------//
259     // get the thread id
260     static tid_type this_tid() { return std::this_thread::get_id(); }
261 
262     //------------------------------------------------------------------------//
263     // get the task count
264     atomic_int&       task_count() { return m_tot_task_count; }
265     const atomic_int& task_count() const { return m_tot_task_count; }
266 
267 protected:
268     static int f_verbose;
269     // Private variables
270     uintmax_t         m_id       = internal::task_group_counter()++;
271     intmax_t          m_depth    = internal::get_task_depth();
272     tid_type          m_main_tid = std::this_thread::get_id();
273     atomic_int        m_tot_task_count{ 0 };
274     lock_t            m_task_lock = {};
275     condition_t       m_task_cond = {};
276     join_type         m_join{};
277     ThreadPool*       m_pool           = internal::get_default_threadpool();
278     tbb_task_group_t* m_tbb_task_group = nullptr;
279     task_list_t       m_task_list      = {};
280     future_list_t     m_future_list    = {};
281 
282 private:
283     void internal_update();
284 };
285 
286 }  // namespace PTL
287 namespace PTL
288 {
289 template <typename Tp, typename Arg, intmax_t MaxDepth>
290 template <typename Func>
291 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Func&& _join, ThreadPool* _tp)
292 : m_join{ std::forward<Func>(_join) }
293 , m_pool{ _tp }
294 {
295     internal_update();
296 }
297 
298 template <typename Tp, typename Arg, intmax_t MaxDepth>
299 template <typename Up>
300 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(ThreadPool* _tp,
301                                         enable_if_t<std::is_void<Up>::value, int>)
302 : m_join{ []() {} }
303 , m_pool{ _tp }
304 {
305     internal_update();
306 }
307 
308 // Destructor
309 template <typename Tp, typename Arg, intmax_t MaxDepth>
310 TaskGroup<Tp, Arg, MaxDepth>::~TaskGroup()
311 {
312     {
313         // task will decrement counter and then acquire the lock to notify
314         // condition variable so acquiring lock here will prevent the
315         // task group from being destroyed before this is completed
316         AutoLock _lk{ m_task_lock, std::defer_lock };
317         if(!_lk.owns_lock())
318             _lk.lock();
319     }
320 
321     if(m_tbb_task_group)
322     {
323         auto* _arena = m_pool->get_task_arena();
324         _arena->execute([this]() { this->m_tbb_task_group->wait(); });
325     }
326     delete m_tbb_task_group;
327     this->clear();
328 }
329 
330 template <typename Tp, typename Arg, intmax_t MaxDepth>
331 template <typename Up>
332 std::shared_ptr<Up>
333 TaskGroup<Tp, Arg, MaxDepth>::operator+=(std::shared_ptr<Up>&& _task)
334 {
335     // thread-safe increment of tasks in task group
336     operator++();
337     // copy the shared pointer to abstract instance
338     m_task_list.push_back(_task);
339     // return the derived instance
340     return std::move(_task);
341 }
342 
343 template <typename Tp, typename Arg, intmax_t MaxDepth>
344 void
345 TaskGroup<Tp, Arg, MaxDepth>::wait()
346 {
347     auto _dtor = ScopeDestructor{ [&]() {
348         if(m_tbb_task_group)
349         {
350             auto* _arena = m_pool->get_task_arena();
351             _arena->execute([this]() { this->m_tbb_task_group->wait(); });
352         }
353     } };
354 
355     ThreadData* data = ThreadData::GetInstance();
356     if(!data)
357         return;
358 
359     // if no pool was initially present at creation
360     if(!m_pool)
361     {
362         // check for master MT run-manager
363         m_pool = internal::get_default_threadpool();
364 
365         // if no thread pool created
366         if(!m_pool)
367         {
368             if(f_verbose > 0)
369             {
370                 fprintf(stderr, "%s @ %i :: Warning! nullptr to thread-pool (%p)\n",
371                         __FUNCTION__, __LINE__, static_cast<void*>(m_pool));
372                 std::cerr << __FUNCTION__ << "@" << __LINE__ << " :: Warning! "
373                           << "nullptr to thread pool!" << std::endl;
374             }
375             return;
376         }
377     }
378 
379     ThreadPool*     tpool = (m_pool) ? m_pool : data->thread_pool;
380     VUserTaskQueue* taskq = (tpool) ? tpool->get_queue() : data->current_queue;
381 
382     bool _is_main     = data->is_main;
383     bool _within_task = data->within_task;
384 
385     auto is_active_state = [&]() {
386         return (tpool->state()->load(std::memory_order_relaxed) !=
387                 thread_pool::state::STOPPED);
388     };
389 
390     auto execute_this_threads_tasks = [&]() {
391         if(!taskq)
392             return;
393 
394         // only want to process if within a task
395         if((!_is_main || tpool->size() < 2) && _within_task)
396         {
397             int bin = static_cast<int>(taskq->GetThreadBin());
398             // const auto nitr = (tpool) ? tpool->size() :
399             // Thread::hardware_concurrency();
400             while(this->pending() > 0)
401             {
402                 if(!taskq->empty())
403                 {
404                     auto _task = taskq->GetTask(bin);
405                     if(_task)
406                         (*_task)();
407                 }
408             }
409         }
410     };
411 
412     // checks for validity
413     if(!is_native_task_group())
414     {
415         // for external threads
416         if(!_is_main || tpool->size() < 2)
417             return;
418     }
419     else if(f_verbose > 0)
420     {
421         if(!tpool || !taskq)
422         {
423             // something is wrong, didn't create thread-pool?
424             fprintf(stderr,
425                     "%s @ %i :: Warning! nullptr to thread data (%p) or task-queue "
426                     "(%p)\n",
427                     __FUNCTION__, __LINE__, static_cast<void*>(tpool),
428                     static_cast<void*>(taskq));
429         }
430         // return if thread pool isn't built
431         else if(is_native_task_group() && !tpool->is_alive())
432         {
433             fprintf(stderr, "%s @ %i :: Warning! thread-pool is not alive!\n",
434                     __FUNCTION__, __LINE__);
435         }
436         else if(!is_active_state())
437         {
438             fprintf(stderr, "%s @ %i :: Warning! thread-pool is not active!\n",
439                     __FUNCTION__, __LINE__);
440         }
441     }
442 
443     intmax_t wake_size = 2;
444     AutoLock _lock(m_task_lock, std::defer_lock);
445 
446     while(is_active_state())
447     {
448         execute_this_threads_tasks();
449 
450         // while loop protects against spurious wake-ups
451         while(_is_main && pending() > 0 && is_active_state())
452         {
453             // auto _wake = [&]() { return (wake_size > pending() ||
454             // !is_active_state());
455             // };
456 
457             // lock before sleeping on condition
458             if(!_lock.owns_lock())
459                 _lock.lock();
460 
461             // Wait until signaled that a task has been competed
462             // Unlock mutex while wait, then lock it back when signaled
463             // when true, this wakes the thread
464             if(pending() >= wake_size)
465             {
466                 m_task_cond.wait(_lock);
467             }
468             else
469             {
470                 m_task_cond.wait_for(_lock, std::chrono::microseconds(100));
471             }
472             // unlock
473             if(_lock.owns_lock())
474                 _lock.unlock();
475         }
476 
477         // if pending is not greater than zero, we are joined
478         if(pending() <= 0)
479             break;
480     }
481 
482     if(_lock.owns_lock())
483         _lock.unlock();
484 
485     intmax_t ntask = this->task_count().load();
486     if(ntask > 0)
487     {
488         std::stringstream ss;
489         ss << "\nWarning! Join operation issue! " << ntask << " tasks still "
490            << "are running!" << std::endl;
491         std::cerr << ss.str();
492         this->wait();
493     }
494 }
495 
496 template <typename Tp, typename Arg, intmax_t MaxDepth>
497 ScopeDestructor
498 TaskGroup<Tp, Arg, MaxDepth>::get_scope_destructor()
499 {
500     auto& _counter   = m_tot_task_count;
501     auto& _task_cond = task_cond();
502     auto& _task_lock = task_lock();
503     return ScopeDestructor{ [&_task_cond, &_task_lock, &_counter]() {
504         auto _count = --(_counter);
505         if(_count < 1)
506         {
507             AutoLock _lk{ _task_lock };
508             _task_cond.notify_all();
509         }
510     } };
511 }
512 
513 template <typename Tp, typename Arg, intmax_t MaxDepth>
514 void
515 TaskGroup<Tp, Arg, MaxDepth>::notify()
516 {
517     AutoLock _lk{ m_task_lock };
518     m_task_cond.notify_one();
519 }
520 
521 template <typename Tp, typename Arg, intmax_t MaxDepth>
522 void
523 TaskGroup<Tp, Arg, MaxDepth>::notify_all()
524 {
525     AutoLock _lk{ m_task_lock };
526     m_task_cond.notify_all();
527 }
528 
529 template <typename Tp, typename Arg, intmax_t MaxDepth>
530 template <typename Func, typename... Args, typename Up>
531 enable_if_t<std::is_void<Up>::value, void>
532 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args)
533 {
534     if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
535        ThreadData::GetInstance()->task_depth > MaxDepth)
536     {
537         local_exec<Tp>(std::move(func), std::move(args)...);
538     }
539     else
540     {
541         auto& _counter   = m_tot_task_count;
542         auto& _task_cond = task_cond();
543         auto& _task_lock = task_lock();
544         auto  _task      = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
545             auto* _tdata = ThreadData::GetInstance();
546             if(_tdata)
547                 ++(_tdata->task_depth);
548             func(args...);
549             auto _count = --(_counter);
550             if(_tdata)
551                 --(_tdata->task_depth);
552             if(_count < 1)
553             {
554                 AutoLock _lk{ _task_lock };
555                 _task_cond.notify_all();
556             }
557         });
558 
559         if(m_tbb_task_group)
560         {
561             auto* _arena          = m_pool->get_task_arena();
562             auto* _tbb_task_group = m_tbb_task_group;
563             auto* _ptask          = _task.get();
564             _arena->execute([_tbb_task_group, _ptask]() {
565                 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
566             });
567         }
568         else
569         {
570             m_pool->add_task(std::move(_task));
571         }
572     }
573 }
574 template <typename Tp, typename Arg, intmax_t MaxDepth>
575 template <typename Func, typename... Args, typename Up>
576 enable_if_t<!std::is_void<Up>::value, void>
577 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args)
578 {
579     if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
580        ThreadData::GetInstance()->task_depth > MaxDepth)
581     {
582         local_exec<Tp>(std::move(func), std::move(args)...);
583     }
584     else
585     {
586         auto& _counter   = m_tot_task_count;
587         auto& _task_cond = task_cond();
588         auto& _task_lock = task_lock();
589         auto  _task      = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
590             auto* _tdata = ThreadData::GetInstance();
591             if(_tdata)
592                 ++(_tdata->task_depth);
593             auto&& _ret   = func(args...);
594             auto   _count = --(_counter);
595             if(_tdata)
596                 --(_tdata->task_depth);
597             if(_count < 1)
598             {
599                 AutoLock _lk{ _task_lock };
600                 _task_cond.notify_all();
601             }
602             return std::forward<decltype(_ret)>(_ret);
603         });
604 
605         if(m_tbb_task_group)
606         {
607             auto* _arena          = m_pool->get_task_arena();
608             auto* _tbb_task_group = m_tbb_task_group;
609             auto* _ptask          = _task.get();
610             _arena->execute([_tbb_task_group, _ptask]() {
611                 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
612             });
613         }
614         else
615         {
616             m_pool->add_task(std::move(_task));
617         }
618     }
619 }
620 
621 template <typename Tp, typename Arg, intmax_t MaxDepth>
622 template <typename Up, typename Func, typename... Args>
623 enable_if_t<std::is_void<Up>::value, void>
624 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args)
625 {
626     auto* _tdata = ThreadData::GetInstance();
627     if(_tdata)
628         ++(_tdata->task_depth);
629     promise_type _p{};
630     m_future_list.emplace_back(_p.get_future());
631     func(args...);
632     _p.set_value();
633     if(_tdata)
634         --(_tdata->task_depth);
635 }
636 
637 template <typename Tp, typename Arg, intmax_t MaxDepth>
638 template <typename Up, typename Func, typename... Args>
639 enable_if_t<!std::is_void<Up>::value, void>
640 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args)
641 {
642     auto* _tdata = ThreadData::GetInstance();
643     if(_tdata)
644         ++(_tdata->task_depth);
645     promise_type _p{};
646     m_future_list.emplace_back(_p.get_future());
647     _p.set_value(func(args...));
648     if(_tdata)
649         --(_tdata->task_depth);
650 }
651 
652 template <typename Tp, typename Arg, intmax_t MaxDepth>
653 template <typename Up, enable_if_t<!std::is_void<Up>::value, int>>
654 inline Up
655 TaskGroup<Tp, Arg, MaxDepth>::join(Up accum)
656 {
657     this->wait();
658     for(auto& itr : m_task_list)
659     {
660         using RetT = decay_t<decltype(itr->get())>;
661         accum      = std::move(m_join(std::ref(accum), std::forward<RetT>(itr->get())));
662     }
663     for(auto& itr : m_future_list)
664     {
665         using RetT = decay_t<decltype(itr.get())>;
666         accum      = std::move(m_join(std::ref(accum), std::forward<RetT>(itr.get())));
667     }
668     this->clear();
669     return accum;
670 }
671 
672 template <typename Tp, typename Arg, intmax_t MaxDepth>
673 template <typename Up, typename Rp,
674           enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int>>
675 inline void
676 TaskGroup<Tp, Arg, MaxDepth>::join()
677 {
678     this->wait();
679     for(auto& itr : m_task_list)
680         itr->get();
681     for(auto& itr : m_future_list)
682         itr.get();
683     m_join();
684     this->clear();
685 }
686 
687 template <typename Tp, typename Arg, intmax_t MaxDepth>
688 template <typename Up, typename Rp,
689           enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int>>
690 inline void
691 TaskGroup<Tp, Arg, MaxDepth>::join()
692 {
693     this->wait();
694     for(auto& itr : m_task_list)
695     {
696         using RetT = decay_t<decltype(itr->get())>;
697         m_join(std::forward<RetT>(itr->get()));
698     }
699     for(auto& itr : m_future_list)
700     {
701         using RetT = decay_t<decltype(itr.get())>;
702         m_join(std::forward<RetT>(itr.get()));
703     }
704     this->clear();
705 }
706 
707 template <typename Tp, typename Arg, intmax_t MaxDepth>
708 void
709 TaskGroup<Tp, Arg, MaxDepth>::clear()
710 {
711     m_future_list.clear();
712     m_task_list.clear();
713 }
714 
715 template <typename Tp, typename Arg, intmax_t MaxDepth>
716 void
717 TaskGroup<Tp, Arg, MaxDepth>::internal_update()
718 {
719     if(!m_pool)
720         m_pool = internal::get_default_threadpool();
721 
722     if(!m_pool)
723     {
724         std::stringstream ss{};
725         ss << "[TaskGroup]> " << __FUNCTION__ << "@" << __LINE__
726            << " :: nullptr to thread pool";
727         throw std::runtime_error(ss.str());
728     }
729 
730     if(m_pool->is_tbb_threadpool())
731     {
732         m_tbb_task_group = new tbb_task_group_t{};
733     }
734 }
735 
736 template <typename Tp, typename Arg, intmax_t MaxDepth>
737 int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = 0;
738 
739 }  // namespace PTL
740