Geant4 Cross Reference

Cross-Referencing   Geant4
Geant4/externals/ptl/src/UserTaskQueue.cc

Version: [ ReleaseNotes ] [ 1.0 ] [ 1.1 ] [ 2.0 ] [ 3.0 ] [ 3.1 ] [ 3.2 ] [ 4.0 ] [ 4.0.p1 ] [ 4.0.p2 ] [ 4.1 ] [ 4.1.p1 ] [ 5.0 ] [ 5.0.p1 ] [ 5.1 ] [ 5.1.p1 ] [ 5.2 ] [ 5.2.p1 ] [ 5.2.p2 ] [ 6.0 ] [ 6.0.p1 ] [ 6.1 ] [ 6.2 ] [ 6.2.p1 ] [ 6.2.p2 ] [ 7.0 ] [ 7.0.p1 ] [ 7.1 ] [ 7.1.p1 ] [ 8.0 ] [ 8.0.p1 ] [ 8.1 ] [ 8.1.p1 ] [ 8.1.p2 ] [ 8.2 ] [ 8.2.p1 ] [ 8.3 ] [ 8.3.p1 ] [ 8.3.p2 ] [ 9.0 ] [ 9.0.p1 ] [ 9.0.p2 ] [ 9.1 ] [ 9.1.p1 ] [ 9.1.p2 ] [ 9.1.p3 ] [ 9.2 ] [ 9.2.p1 ] [ 9.2.p2 ] [ 9.2.p3 ] [ 9.2.p4 ] [ 9.3 ] [ 9.3.p1 ] [ 9.3.p2 ] [ 9.4 ] [ 9.4.p1 ] [ 9.4.p2 ] [ 9.4.p3 ] [ 9.4.p4 ] [ 9.5 ] [ 9.5.p1 ] [ 9.5.p2 ] [ 9.6 ] [ 9.6.p1 ] [ 9.6.p2 ] [ 9.6.p3 ] [ 9.6.p4 ] [ 10.0 ] [ 10.0.p1 ] [ 10.0.p2 ] [ 10.0.p3 ] [ 10.0.p4 ] [ 10.1 ] [ 10.1.p1 ] [ 10.1.p2 ] [ 10.1.p3 ] [ 10.2 ] [ 10.2.p1 ] [ 10.2.p2 ] [ 10.2.p3 ] [ 10.3 ] [ 10.3.p1 ] [ 10.3.p2 ] [ 10.3.p3 ] [ 10.4 ] [ 10.4.p1 ] [ 10.4.p2 ] [ 10.4.p3 ] [ 10.5 ] [ 10.5.p1 ] [ 10.6 ] [ 10.6.p1 ] [ 10.6.p2 ] [ 10.6.p3 ] [ 10.7 ] [ 10.7.p1 ] [ 10.7.p2 ] [ 10.7.p3 ] [ 10.7.p4 ] [ 11.0 ] [ 11.0.p1 ] [ 11.0.p2 ] [ 11.0.p3, ] [ 11.0.p4 ] [ 11.1 ] [ 11.1.1 ] [ 11.1.2 ] [ 11.1.3 ] [ 11.2 ] [ 11.2.1 ] [ 11.2.2 ] [ 11.3.0 ]

Diff markup

Differences between /externals/ptl/src/UserTaskQueue.cc (Version 11.3.0) and /externals/ptl/src/UserTaskQueue.cc (Version 11.1.1)


  1 //                                                  1 //
  2 // MIT License                                      2 // MIT License
  3 // Copyright (c) 2020 Jonathan R. Madsen            3 // Copyright (c) 2020 Jonathan R. Madsen
  4 // Permission is hereby granted, free of charg      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
  5 // of this software and associated documentati      5 // of this software and associated documentation files (the "Software"), to deal
  6 // in the Software without restriction, includ      6 // in the Software without restriction, including without limitation the rights
  7 // to use, copy, modify, merge, publish, distr      7 // to use, copy, modify, merge, publish, distribute, sublicense, and
  8 // copies of the Software, and to permit perso      8 // copies of the Software, and to permit persons to whom the Software is
  9 // furnished to do so, subject to the followin      9 // furnished to do so, subject to the following conditions:
 10 // The above copyright notice and this permiss     10 // The above copyright notice and this permission notice shall be included in
 11 // all copies or substantial portions of the S     11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR     12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT     13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH     14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR     15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS     16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI     17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 18 //                                                 18 //
 19 // -------------------------------------------     19 // ---------------------------------------------------------------
 20 //  Tasking class implementation                   20 //  Tasking class implementation
 21 //  Class Description:                             21 //  Class Description:
 22 //  ------------------------------------------     22 //  ---------------------------------------------------------------
 23 //  Author: Jonathan Madsen                        23 //  Author: Jonathan Madsen
 24 //  ------------------------------------------     24 //  ---------------------------------------------------------------
 25                                                    25 
 26 #include "PTL/UserTaskQueue.hh"                    26 #include "PTL/UserTaskQueue.hh"
 27                                                    27 
 28 #include "PTL/AutoLock.hh"                         28 #include "PTL/AutoLock.hh"
 29 #include "PTL/ScopeDestructor.hh"              << 
 30 #include "PTL/TaskGroup.hh"                        29 #include "PTL/TaskGroup.hh"
 31 #include "PTL/ThreadData.hh"                       30 #include "PTL/ThreadData.hh"
 32 #include "PTL/ThreadPool.hh"                       31 #include "PTL/ThreadPool.hh"
                                                   >>  32 #include "PTL/Utility.hh"
 33                                                    33 
 34 #include <cassert>                                 34 #include <cassert>
 35 #include <chrono>                                  35 #include <chrono>
 36 #include <functional>                              36 #include <functional>
 37 #include <iostream>                                37 #include <iostream>
 38 #include <map>                                     38 #include <map>
 39 #include <stdexcept>                               39 #include <stdexcept>
                                                   >>  40 #include <system_error>
 40 #include <thread>                                  41 #include <thread>
 41 #include <utility>                                 42 #include <utility>
 42                                                    43 
 43 namespace PTL                                  <<  44 using namespace PTL;
 44 {                                              <<  45 
 45 //============================================     46 //======================================================================================//
 46                                                    47 
 47 UserTaskQueue::UserTaskQueue(intmax_t nworkers     48 UserTaskQueue::UserTaskQueue(intmax_t nworkers, UserTaskQueue* parent)
 48 : VUserTaskQueue(nworkers)                         49 : VUserTaskQueue(nworkers)
 49 , m_is_clone((parent) != nullptr)                  50 , m_is_clone((parent) != nullptr)
 50 , m_thread_bin((parent) ? (ThreadPool::get_thi     51 , m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
 51 , m_insert_bin((parent) ? (ThreadPool::get_thi     52 , m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
 52 , m_hold((parent) ? parent->m_hold : new std::     53 , m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
 53 , m_ntasks((parent) ? parent->m_ntasks : new s     54 , m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
 54 , m_mutex((parent) ? parent->m_mutex : new Mut     55 , m_mutex((parent) ? parent->m_mutex : new Mutex{})
 55 , m_subqueues((parent) ? parent->m_subqueues :     56 , m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
 56 {                                                  57 {
 57     // create nthreads + 1 subqueues so there      58     // create nthreads + 1 subqueues so there is always a subqueue available
 58     if(!parent)                                    59     if(!parent)
 59     {                                              60     {
 60         for(intmax_t i = 0; i < nworkers + 1;      61         for(intmax_t i = 0; i < nworkers + 1; ++i)
 61             m_subqueues->emplace_back(new Task     62             m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
 62     }                                              63     }
                                                   >>  64 
                                                   >>  65 #if defined(DEBUG)
                                                   >>  66     if(GetEnv<int>("PTL_VERBOSE", 0) > 3)
                                                   >>  67     {
                                                   >>  68         RecursiveAutoLock l(TypeMutex<decltype(std::cout), RecursiveMutex>());
                                                   >>  69         std::stringstream ss;
                                                   >>  70         ss << ThreadPool::get_this_thread_id() << "> " << ThisThread::get_id() << " ["
                                                   >>  71            << __FUNCTION__ << ":" << __LINE__ << "] "
                                                   >>  72            << "this = " << this << ", "
                                                   >>  73            << "clone = " << std::boolalpha << m_is_clone << ", "
                                                   >>  74            << "thread = " << m_thread_bin << ", "
                                                   >>  75            << "insert = " << m_insert_bin << ", "
                                                   >>  76            << "hold = " << m_hold->load() << " @ " << m_hold << ", "
                                                   >>  77            << "tasks = " << m_ntasks->load() << " @ " << m_ntasks << ", "
                                                   >>  78            << "subqueue = " << m_subqueues << ", "
                                                   >>  79            << "size = " << true_size() << ", "
                                                   >>  80            << "empty = " << true_empty();
                                                   >>  81         std::cout << ss.str() << std::endl;
                                                   >>  82     }
                                                   >>  83 #endif
 63 }                                                  84 }
 64                                                    85 
 65 //============================================     86 //======================================================================================//
 66                                                    87 
 67 UserTaskQueue::~UserTaskQueue()                    88 UserTaskQueue::~UserTaskQueue()
 68 {                                                  89 {
 69     if(!m_is_clone)                                90     if(!m_is_clone)
 70     {                                              91     {
 71         for(auto& itr : *m_subqueues)              92         for(auto& itr : *m_subqueues)
 72         {                                          93         {
 73             assert(itr->empty());                  94             assert(itr->empty());
 74             delete itr;                            95             delete itr;
 75         }                                          96         }
 76         m_subqueues->clear();                      97         m_subqueues->clear();
 77         delete m_hold;                             98         delete m_hold;
 78         delete m_ntasks;                           99         delete m_ntasks;
 79         delete m_mutex;                           100         delete m_mutex;
 80         delete m_subqueues;                       101         delete m_subqueues;
 81     }                                             102     }
 82 }                                                 103 }
 83                                                   104 
 84 //============================================    105 //======================================================================================//
 85                                                   106 
 86 void                                              107 void
 87 UserTaskQueue::resize(intmax_t n)                 108 UserTaskQueue::resize(intmax_t n)
 88 {                                                 109 {
 89     if(!m_mutex)                                  110     if(!m_mutex)
 90         throw std::runtime_error("nullptr to m    111         throw std::runtime_error("nullptr to mutex");
 91     AutoLock lk(m_mutex);                         112     AutoLock lk(m_mutex);
 92     if(m_workers < n)                             113     if(m_workers < n)
 93     {                                             114     {
 94         while(m_workers < n)                      115         while(m_workers < n)
 95         {                                         116         {
 96             m_subqueues->emplace_back(new Task    117             m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
 97             ++m_workers;                          118             ++m_workers;
 98         }                                         119         }
 99     }                                             120     }
100     else if(m_workers > n)                        121     else if(m_workers > n)
101     {                                             122     {
102         while(m_workers > n)                      123         while(m_workers > n)
103         {                                         124         {
104             delete m_subqueues->back();           125             delete m_subqueues->back();
105             m_subqueues->pop_back();              126             m_subqueues->pop_back();
106             --m_workers;                          127             --m_workers;
107         }                                         128         }
108     }                                             129     }
109 }                                                 130 }
110                                                   131 
111 //============================================    132 //======================================================================================//
112                                                   133 
113 VUserTaskQueue*                                   134 VUserTaskQueue*
114 UserTaskQueue::clone()                            135 UserTaskQueue::clone()
115 {                                                 136 {
116     return new UserTaskQueue(workers(), this);    137     return new UserTaskQueue(workers(), this);
117 }                                                 138 }
118 //============================================    139 //======================================================================================//
119                                                   140 
120 intmax_t                                          141 intmax_t
121 UserTaskQueue::GetThreadBin() const               142 UserTaskQueue::GetThreadBin() const
122 {                                                 143 {
123     // get a thread id number                     144     // get a thread id number
124     static thread_local intmax_t tl_bin =         145     static thread_local intmax_t tl_bin =
125         (m_thread_bin + ThreadPool::get_this_t    146         (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1);
126     return tl_bin;                                147     return tl_bin;
127 }                                                 148 }
128                                                   149 
129 //============================================    150 //======================================================================================//
130                                                   151 
131 intmax_t                                          152 intmax_t
132 UserTaskQueue::GetInsertBin() const               153 UserTaskQueue::GetInsertBin() const
133 {                                                 154 {
134     return (++m_insert_bin % (m_workers + 1));    155     return (++m_insert_bin % (m_workers + 1));
135 }                                                 156 }
136                                                   157 
137 //============================================    158 //======================================================================================//
138                                                   159 
139 UserTaskQueue::task_pointer                       160 UserTaskQueue::task_pointer
140 UserTaskQueue::GetThreadBinTask()                 161 UserTaskQueue::GetThreadBinTask()
141 {                                                 162 {
142     intmax_t      tbin      = GetThreadBin();     163     intmax_t      tbin      = GetThreadBin();
143     TaskSubQueue* task_subq = (*m_subqueues)[t    164     TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
144     task_pointer  _task     = nullptr;            165     task_pointer  _task     = nullptr;
145                                                   166 
146     //----------------------------------------    167     //------------------------------------------------------------------------//
147     auto get_task = [&]() {                       168     auto get_task = [&]() {
148         if(task_subq->AcquireClaim())             169         if(task_subq->AcquireClaim())
149         {                                         170         {
150             // run task                           171             // run task
151             _task = task_subq->PopTask(true);     172             _task = task_subq->PopTask(true);
152             // release the claim on the bin       173             // release the claim on the bin
153             task_subq->ReleaseClaim();            174             task_subq->ReleaseClaim();
154         }                                         175         }
155         if(_task)                                 176         if(_task)
156             --(*m_ntasks);                        177             --(*m_ntasks);
157         // return success if valid pointer        178         // return success if valid pointer
158         return (_task != nullptr);                179         return (_task != nullptr);
159     };                                            180     };
160     //----------------------------------------    181     //------------------------------------------------------------------------//
161                                                   182 
162     // while not empty                            183     // while not empty
163     while(!task_subq->empty())                    184     while(!task_subq->empty())
164     {                                             185     {
165         if(get_task())                            186         if(get_task())
166             break;                                187             break;
167     }                                             188     }
168     return _task;                                 189     return _task;
169 }                                                 190 }
170                                                   191 
171 //============================================    192 //======================================================================================//
172                                                   193 
173 UserTaskQueue::task_pointer                       194 UserTaskQueue::task_pointer
174 UserTaskQueue::GetTask(intmax_t subq, intmax_t    195 UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr)
175 {                                                 196 {
176     // exit if empty                              197     // exit if empty
177     if(this->true_empty())                        198     if(this->true_empty())
178         return nullptr;                           199         return nullptr;
179                                                   200 
180     // ensure the thread has a bin assignment     201     // ensure the thread has a bin assignment
181     intmax_t tbin = GetThreadBin();               202     intmax_t tbin = GetThreadBin();
182     intmax_t n    = (subq < 0) ? tbin : subq;     203     intmax_t n    = (subq < 0) ? tbin : subq;
183     if(nitr < 1)                                  204     if(nitr < 1)
184         nitr = (m_workers + 1);  // * m_ntasks    205         nitr = (m_workers + 1);  // * m_ntasks->load(std::memory_order_relaxed);
185                                                   206 
186     if(m_hold->load(std::memory_order_relaxed)    207     if(m_hold->load(std::memory_order_relaxed))
187     {                                             208     {
188         return GetThreadBinTask();                209         return GetThreadBinTask();
189     }                                             210     }
190                                                   211 
191     task_pointer _task = nullptr;                 212     task_pointer _task = nullptr;
192     //----------------------------------------    213     //------------------------------------------------------------------------//
193     auto get_task = [&](intmax_t _n) {            214     auto get_task = [&](intmax_t _n) {
194         TaskSubQueue* task_subq = (*m_subqueue    215         TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
195         // try to acquire a claim for the bin     216         // try to acquire a claim for the bin
196         // if acquired, no other threads will     217         // if acquired, no other threads will access bin until claim is released
197         if(!task_subq->empty() && task_subq->A    218         if(!task_subq->empty() && task_subq->AcquireClaim())
198         {                                         219         {
199             // pop task out of bin                220             // pop task out of bin
200             _task = task_subq->PopTask(n == tb    221             _task = task_subq->PopTask(n == tbin);
201             // release the claim on the bin       222             // release the claim on the bin
202             task_subq->ReleaseClaim();            223             task_subq->ReleaseClaim();
203         }                                         224         }
204         if(_task)                                 225         if(_task)
205             --(*m_ntasks);                        226             --(*m_ntasks);
206         // return success if valid pointer        227         // return success if valid pointer
207         return (_task != nullptr);                228         return (_task != nullptr);
208     };                                            229     };
209     //----------------------------------------    230     //------------------------------------------------------------------------//
210                                                   231 
211     // there are num_workers+1 bins so there i    232     // there are num_workers+1 bins so there is always a bin that is open
212     // execute num_workers+2 iterations so the    233     // execute num_workers+2 iterations so the thread checks its bin twice
213     // while(!empty())                            234     // while(!empty())
214     {                                             235     {
215         for(intmax_t i = 0; i < nitr; ++i, ++n    236         for(intmax_t i = 0; i < nitr; ++i, ++n)
216         {                                         237         {
217             if(get_task(n % (m_workers + 1)))     238             if(get_task(n % (m_workers + 1)))
218                 return _task;                     239                 return _task;
219         }                                         240         }
220     }                                             241     }
221                                                   242 
222     // only reached if looped over all bins (a    243     // only reached if looped over all bins (and looked in own bin twice)
223     // and found no work so return an empty ta    244     // and found no work so return an empty task and the thread will be put to
224     // sleep if there is still no work by the     245     // sleep if there is still no work by the time it reaches its
225     // condition variable                         246     // condition variable
226     return _task;                                 247     return _task;
227 }                                                 248 }
228                                                   249 
229 //============================================    250 //======================================================================================//
230                                                   251 
231 intmax_t                                          252 intmax_t
232 UserTaskQueue::InsertTask(task_pointer&& task,    253 UserTaskQueue::InsertTask(task_pointer&& task, ThreadData* data, intmax_t subq)
233 {                                                 254 {
234     // increment number of tasks                  255     // increment number of tasks
235     ++(*m_ntasks);                                256     ++(*m_ntasks);
236                                                   257 
237     bool     spin = m_hold->load(std::memory_o    258     bool     spin = m_hold->load(std::memory_order_relaxed);
238     intmax_t tbin = GetThreadBin();               259     intmax_t tbin = GetThreadBin();
239                                                   260 
240     if(data && data->within_task)                 261     if(data && data->within_task)
241     {                                             262     {
242         subq = tbin;                              263         subq = tbin;
243         // spin = true;                           264         // spin = true;
244     }                                             265     }
245                                                   266 
246     // subq is -1 unless specified so unless s    267     // subq is -1 unless specified so unless specified
247     // GetInsertBin() call increments a counte    268     // GetInsertBin() call increments a counter and returns
248     // counter % (num_workers + 1) so that tas    269     // counter % (num_workers + 1) so that tasks are distributed evenly
249     // among the bins                             270     // among the bins
250     intmax_t n = (subq < 0) ? GetInsertBin() :    271     intmax_t n = (subq < 0) ? GetInsertBin() : subq;
251                                                   272 
252     //----------------------------------------    273     //------------------------------------------------------------------------//
253     auto insert_task = [&](intmax_t _n) {         274     auto insert_task = [&](intmax_t _n) {
254         TaskSubQueue* task_subq = (*m_subqueue    275         TaskSubQueue* task_subq = (*m_subqueues)[_n];
255         // TaskSubQueue* next_subq = (*m_subqu    276         // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
256         // if not threads bin and size differe    277         // if not threads bin and size difference, insert into smaller
257         // if(n != tbin && next_subq->size() <    278         // if(n != tbin && next_subq->size() < task_subq->size())
258         //    task_subq = next_subq;              279         //    task_subq = next_subq;
259         // try to acquire a claim for the bin     280         // try to acquire a claim for the bin
260         // if acquired, no other threads will     281         // if acquired, no other threads will access bin until claim is released
261         if(task_subq->AcquireClaim())             282         if(task_subq->AcquireClaim())
262         {                                         283         {
263             // push the task into the bin         284             // push the task into the bin
264             task_subq->PushTask(std::move(task    285             task_subq->PushTask(std::move(task));
265             // release the claim on the bin       286             // release the claim on the bin
266             task_subq->ReleaseClaim();            287             task_subq->ReleaseClaim();
267             // return success                     288             // return success
268             return true;                          289             return true;
269         }                                         290         }
270         return false;                             291         return false;
271     };                                            292     };
272     //----------------------------------------    293     //------------------------------------------------------------------------//
273                                                   294 
274     // if not in "hold/spin mode", where threa    295     // if not in "hold/spin mode", where thread only inserts tasks into
275     // specified bin, then move onto next bin     296     // specified bin, then move onto next bin
276     //                                            297     //
277     if(spin)                                      298     if(spin)
278     {                                             299     {
279         n = n % (m_workers + 1);                  300         n = n % (m_workers + 1);
280         while(!insert_task(n))                    301         while(!insert_task(n))
281             ;                                     302             ;
282         return n;                                 303         return n;
283     }                                             304     }
284                                                   305 
285     // there are num_workers+1 bins so there i    306     // there are num_workers+1 bins so there is always a bin that is open
286     // execute num_workers+2 iterations so the    307     // execute num_workers+2 iterations so the thread checks its bin twice
287     while(true)                                   308     while(true)
288     {                                             309     {
289         auto _n = (n++) % (m_workers + 1);        310         auto _n = (n++) % (m_workers + 1);
290         if(insert_task(_n))                       311         if(insert_task(_n))
291             return _n;                            312             return _n;
292     }                                             313     }
                                                   >> 314     return GetThreadBin();
293 }                                                 315 }
294                                                   316 
295 //============================================    317 //======================================================================================//
296                                                   318 
297 void                                              319 void
298 UserTaskQueue::ExecuteOnAllThreads(ThreadPool*    320 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* tp, function_type func)
299 {                                                 321 {
300     using task_group_type      = TaskGroup<int    322     using task_group_type      = TaskGroup<int, int>;
301     using thread_execute_map_t = std::map<int6    323     using thread_execute_map_t = std::map<int64_t, bool>;
302                                                   324 
303     if(!tp->is_alive())                           325     if(!tp->is_alive())
304     {                                             326     {
305         func();                                   327         func();
306         return;                                   328         return;
307     }                                             329     }
308                                                   330 
309     task_group_type tg{ [](int& ref, int i) {     331     task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
310                                                   332 
311     // wait for all threads to finish any work    333     // wait for all threads to finish any work
312     // NOTE: will cause deadlock if called fro    334     // NOTE: will cause deadlock if called from a task
313     while(tp->get_active_threads_count() > 0)     335     while(tp->get_active_threads_count() > 0)
314         ThisThread::sleep_for(std::chrono::mil    336         ThisThread::sleep_for(std::chrono::milliseconds(10));
315                                                   337 
316     thread_execute_map_t                thread    338     thread_execute_map_t                thread_execute_map{};
317     std::vector<std::shared_ptr<VTask>> _tasks    339     std::vector<std::shared_ptr<VTask>> _tasks{};
318     _tasks.reserve(m_workers + 1);                340     _tasks.reserve(m_workers + 1);
319                                                   341 
320     AcquireHold();                                342     AcquireHold();
321     for(int i = 0; i < (m_workers + 1); ++i)      343     for(int i = 0; i < (m_workers + 1); ++i)
322     {                                             344     {
323         if(i == GetThreadBin())                   345         if(i == GetThreadBin())
324             continue;                             346             continue;
325                                                   347 
326         //------------------------------------    348         //--------------------------------------------------------------------//
327         auto thread_specific_func = [&]() {       349         auto thread_specific_func = [&]() {
328             ScopeDestructor _dtor = tg.get_sco    350             ScopeDestructor _dtor = tg.get_scope_destructor();
329             static Mutex    _mtx;                 351             static Mutex    _mtx;
330             _mtx.lock();                          352             _mtx.lock();
331             bool& _executed = thread_execute_m    353             bool& _executed = thread_execute_map[GetThreadBin()];
332             _mtx.unlock();                        354             _mtx.unlock();
333             if(!_executed)                        355             if(!_executed)
334             {                                     356             {
335                 func();                           357                 func();
336                 _executed = true;                 358                 _executed = true;
337                 return 1;                         359                 return 1;
338             }                                     360             }
339             return 0;                             361             return 0;
340         };                                        362         };
341         //------------------------------------    363         //--------------------------------------------------------------------//
342                                                   364 
343         InsertTask(tg.wrap(thread_specific_fun    365         InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
344     }                                             366     }
345                                                   367 
346     tp->notify_all();                             368     tp->notify_all();
347     int nexecuted = tg.join();                    369     int nexecuted = tg.join();
348     if(nexecuted != m_workers)                    370     if(nexecuted != m_workers)
349     {                                             371     {
350         std::stringstream msg;                    372         std::stringstream msg;
351         msg << "Failure executing routine on a    373         msg << "Failure executing routine on all threads! Only " << nexecuted
352             << " threads executed function out    374             << " threads executed function out of " << m_workers << " workers";
353         std::cerr << msg.str() << std::endl;      375         std::cerr << msg.str() << std::endl;
354     }                                             376     }
355     ReleaseHold();                                377     ReleaseHold();
356 }                                                 378 }
357                                                   379 
358 //============================================    380 //======================================================================================//
359                                                   381 
360 void                                              382 void
361 UserTaskQueue::ExecuteOnSpecificThreads(Thread    383 UserTaskQueue::ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool* tp,
362                                         functi    384                                         function_type func)
363 {                                                 385 {
364     using task_group_type      = TaskGroup<int    386     using task_group_type      = TaskGroup<int, int>;
365     using thread_execute_map_t = std::map<int6    387     using thread_execute_map_t = std::map<int64_t, bool>;
366                                                   388 
367     task_group_type tg{ [](int& ref, int i) {     389     task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
368                                                   390 
369     // wait for all threads to finish any work    391     // wait for all threads to finish any work
370     // NOTE: will cause deadlock if called fro    392     // NOTE: will cause deadlock if called from a task
371     while(tp->get_active_threads_count() > 0)     393     while(tp->get_active_threads_count() > 0)
372         ThisThread::sleep_for(std::chrono::mil    394         ThisThread::sleep_for(std::chrono::milliseconds(10));
373                                                   395 
374     if(!tp->is_alive())                           396     if(!tp->is_alive())
375     {                                             397     {
376         func();                                   398         func();
377         return;                                   399         return;
378     }                                             400     }
379                                                   401 
380     thread_execute_map_t thread_execute_map{};    402     thread_execute_map_t thread_execute_map{};
381                                                   403 
382     //========================================    404     //========================================================================//
383     // wrap the function so that it will only     405     // wrap the function so that it will only be executed if the thread
384     // has an ID in the set                       406     // has an ID in the set
385     auto thread_specific_func = [&]() {           407     auto thread_specific_func = [&]() {
386         ScopeDestructor _dtor = tg.get_scope_d    408         ScopeDestructor _dtor = tg.get_scope_destructor();
387         static Mutex    _mtx;                     409         static Mutex    _mtx;
388         _mtx.lock();                              410         _mtx.lock();
389         bool& _executed = thread_execute_map[G    411         bool& _executed = thread_execute_map[GetThreadBin()];
390         _mtx.unlock();                            412         _mtx.unlock();
391         if(!_executed && tid_set.count(ThisThr    413         if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
392         {                                         414         {
393             func();                               415             func();
394             _executed = true;                     416             _executed = true;
395             return 1;                             417             return 1;
396         }                                         418         }
397         return 0;                                 419         return 0;
398     };                                            420     };
399     //========================================    421     //========================================================================//
400                                                   422 
401     if(tid_set.count(ThisThread::get_id()) > 0    423     if(tid_set.count(ThisThread::get_id()) > 0)
402         func();                                   424         func();
403                                                   425 
404     AcquireHold();                                426     AcquireHold();
405     for(int i = 0; i < (m_workers + 1); ++i)      427     for(int i = 0; i < (m_workers + 1); ++i)
406     {                                             428     {
407         if(i == GetThreadBin())                   429         if(i == GetThreadBin())
408             continue;                             430             continue;
409                                                   431 
410         InsertTask(tg.wrap(thread_specific_fun    432         InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
411     }                                             433     }
412     tp->notify_all();                             434     tp->notify_all();
413     decltype(tid_set.size()) nexecuted = tg.jo    435     decltype(tid_set.size()) nexecuted = tg.join();
414     if(nexecuted != tid_set.size())               436     if(nexecuted != tid_set.size())
415     {                                             437     {
416         std::stringstream msg;                    438         std::stringstream msg;
417         msg << "Failure executing routine on s    439         msg << "Failure executing routine on specific threads! Only " << nexecuted
418             << " threads executed function out    440             << " threads executed function out of " << tid_set.size() << " workers";
419         std::cerr << msg.str() << std::endl;      441         std::cerr << msg.str() << std::endl;
420     }                                             442     }
421     ReleaseHold();                                443     ReleaseHold();
422 }                                                 444 }
423                                                   445 
424 //============================================    446 //======================================================================================//
425                                                   447 
426 void                                              448 void
427 UserTaskQueue::AcquireHold()                      449 UserTaskQueue::AcquireHold()
428 {                                                 450 {
429     bool _hold;                                   451     bool _hold;
430     while(!(_hold = m_hold->load(std::memory_o    452     while(!(_hold = m_hold->load(std::memory_order_relaxed)))
431     {                                             453     {
432         m_hold->compare_exchange_strong(_hold,    454         m_hold->compare_exchange_strong(_hold, true, std::memory_order_release,
433                                         std::m    455                                         std::memory_order_relaxed);
434     }                                             456     }
435 }                                                 457 }
436                                                   458 
437 //============================================    459 //======================================================================================//
438                                                   460 
439 void                                              461 void
440 UserTaskQueue::ReleaseHold()                      462 UserTaskQueue::ReleaseHold()
441 {                                                 463 {
442     bool _hold;                                   464     bool _hold;
443     while((_hold = m_hold->load(std::memory_or    465     while((_hold = m_hold->load(std::memory_order_relaxed)))
444     {                                             466     {
445         m_hold->compare_exchange_strong(_hold,    467         m_hold->compare_exchange_strong(_hold, false, std::memory_order_release,
446                                         std::m    468                                         std::memory_order_relaxed);
447     }                                             469     }
448 }                                                 470 }
449                                                   471 
450 //============================================    472 //======================================================================================//
451                                                << 
452 }  // namespace PTL                            << 
453                                                   473