Geant4 Cross Reference

Cross-Referencing   Geant4
Geant4/externals/ptl/src/UserTaskQueue.cc

Version: [ ReleaseNotes ] [ 1.0 ] [ 1.1 ] [ 2.0 ] [ 3.0 ] [ 3.1 ] [ 3.2 ] [ 4.0 ] [ 4.0.p1 ] [ 4.0.p2 ] [ 4.1 ] [ 4.1.p1 ] [ 5.0 ] [ 5.0.p1 ] [ 5.1 ] [ 5.1.p1 ] [ 5.2 ] [ 5.2.p1 ] [ 5.2.p2 ] [ 6.0 ] [ 6.0.p1 ] [ 6.1 ] [ 6.2 ] [ 6.2.p1 ] [ 6.2.p2 ] [ 7.0 ] [ 7.0.p1 ] [ 7.1 ] [ 7.1.p1 ] [ 8.0 ] [ 8.0.p1 ] [ 8.1 ] [ 8.1.p1 ] [ 8.1.p2 ] [ 8.2 ] [ 8.2.p1 ] [ 8.3 ] [ 8.3.p1 ] [ 8.3.p2 ] [ 9.0 ] [ 9.0.p1 ] [ 9.0.p2 ] [ 9.1 ] [ 9.1.p1 ] [ 9.1.p2 ] [ 9.1.p3 ] [ 9.2 ] [ 9.2.p1 ] [ 9.2.p2 ] [ 9.2.p3 ] [ 9.2.p4 ] [ 9.3 ] [ 9.3.p1 ] [ 9.3.p2 ] [ 9.4 ] [ 9.4.p1 ] [ 9.4.p2 ] [ 9.4.p3 ] [ 9.4.p4 ] [ 9.5 ] [ 9.5.p1 ] [ 9.5.p2 ] [ 9.6 ] [ 9.6.p1 ] [ 9.6.p2 ] [ 9.6.p3 ] [ 9.6.p4 ] [ 10.0 ] [ 10.0.p1 ] [ 10.0.p2 ] [ 10.0.p3 ] [ 10.0.p4 ] [ 10.1 ] [ 10.1.p1 ] [ 10.1.p2 ] [ 10.1.p3 ] [ 10.2 ] [ 10.2.p1 ] [ 10.2.p2 ] [ 10.2.p3 ] [ 10.3 ] [ 10.3.p1 ] [ 10.3.p2 ] [ 10.3.p3 ] [ 10.4 ] [ 10.4.p1 ] [ 10.4.p2 ] [ 10.4.p3 ] [ 10.5 ] [ 10.5.p1 ] [ 10.6 ] [ 10.6.p1 ] [ 10.6.p2 ] [ 10.6.p3 ] [ 10.7 ] [ 10.7.p1 ] [ 10.7.p2 ] [ 10.7.p3 ] [ 10.7.p4 ] [ 11.0 ] [ 11.0.p1 ] [ 11.0.p2 ] [ 11.0.p3, ] [ 11.0.p4 ] [ 11.1 ] [ 11.1.1 ] [ 11.1.2 ] [ 11.1.3 ] [ 11.2 ] [ 11.2.1 ] [ 11.2.2 ] [ 11.3.0 ]

Diff markup

Differences between /externals/ptl/src/UserTaskQueue.cc (Version 11.3.0) and /externals/ptl/src/UserTaskQueue.cc (Version 11.0.p2)


  1 //                                                  1 //
  2 // MIT License                                      2 // MIT License
  3 // Copyright (c) 2020 Jonathan R. Madsen            3 // Copyright (c) 2020 Jonathan R. Madsen
  4 // Permission is hereby granted, free of charg      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
  5 // of this software and associated documentati      5 // of this software and associated documentation files (the "Software"), to deal
  6 // in the Software without restriction, includ      6 // in the Software without restriction, including without limitation the rights
  7 // to use, copy, modify, merge, publish, distr      7 // to use, copy, modify, merge, publish, distribute, sublicense, and
  8 // copies of the Software, and to permit perso      8 // copies of the Software, and to permit persons to whom the Software is
  9 // furnished to do so, subject to the followin      9 // furnished to do so, subject to the following conditions:
 10 // The above copyright notice and this permiss     10 // The above copyright notice and this permission notice shall be included in
 11 // all copies or substantial portions of the S     11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR     12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT     13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH     14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR     15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS     16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI     17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 18 //                                                 18 //
 19 // -------------------------------------------     19 // ---------------------------------------------------------------
 20 //  Tasking class implementation                   20 //  Tasking class implementation
 21 //  Class Description:                             21 //  Class Description:
 22 //  ------------------------------------------     22 //  ---------------------------------------------------------------
 23 //  Author: Jonathan Madsen                        23 //  Author: Jonathan Madsen
 24 //  ------------------------------------------     24 //  ---------------------------------------------------------------
 25                                                    25 
 26 #include "PTL/UserTaskQueue.hh"                    26 #include "PTL/UserTaskQueue.hh"
 27                                                <<  27 #include "PTL/Task.hh"
 28 #include "PTL/AutoLock.hh"                     << 
 29 #include "PTL/ScopeDestructor.hh"              << 
 30 #include "PTL/TaskGroup.hh"                        28 #include "PTL/TaskGroup.hh"
 31 #include "PTL/ThreadData.hh"                   << 
 32 #include "PTL/ThreadPool.hh"                       29 #include "PTL/ThreadPool.hh"
 33                                                    30 
 34 #include <cassert>                                 31 #include <cassert>
 35 #include <chrono>                              << 
 36 #include <functional>                          << 
 37 #include <iostream>                            << 
 38 #include <map>                                 << 
 39 #include <stdexcept>                           << 
 40 #include <thread>                              << 
 41 #include <utility>                             << 
 42                                                    32 
 43 namespace PTL                                  <<  33 using namespace PTL;
 44 {                                              <<  34 
 45 //============================================     35 //======================================================================================//
 46                                                    36 
 47 UserTaskQueue::UserTaskQueue(intmax_t nworkers     37 UserTaskQueue::UserTaskQueue(intmax_t nworkers, UserTaskQueue* parent)
 48 : VUserTaskQueue(nworkers)                         38 : VUserTaskQueue(nworkers)
 49 , m_is_clone((parent) != nullptr)              <<  39 , m_is_clone((parent) ? true : false)
 50 , m_thread_bin((parent) ? (ThreadPool::get_thi     40 , m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
 51 , m_insert_bin((parent) ? (ThreadPool::get_thi     41 , m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
 52 , m_hold((parent) ? parent->m_hold : new std::     42 , m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
 53 , m_ntasks((parent) ? parent->m_ntasks : new s     43 , m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
 54 , m_mutex((parent) ? parent->m_mutex : new Mut << 
 55 , m_subqueues((parent) ? parent->m_subqueues :     44 , m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
 56 {                                                  45 {
 57     // create nthreads + 1 subqueues so there      46     // create nthreads + 1 subqueues so there is always a subqueue available
 58     if(!parent)                                    47     if(!parent)
 59     {                                              48     {
 60         for(intmax_t i = 0; i < nworkers + 1;      49         for(intmax_t i = 0; i < nworkers + 1; ++i)
 61             m_subqueues->emplace_back(new Task     50             m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
 62     }                                              51     }
                                                   >>  52 
                                                   >>  53 #if defined(DEBUG)
                                                   >>  54     if(GetEnv<int>("PTL_VERBOSE", 0) > 3)
                                                   >>  55     {
                                                   >>  56         RecursiveAutoLock l(TypeMutex<decltype(std::cout), RecursiveMutex>());
                                                   >>  57         std::stringstream ss;
                                                   >>  58         ss << ThreadPool::get_this_thread_id() << "> " << ThisThread::get_id() << " ["
                                                   >>  59            << __FUNCTION__ << ":" << __LINE__ << "] "
                                                   >>  60            << "this = " << this << ", "
                                                   >>  61            << "clone = " << std::boolalpha << m_is_clone << ", "
                                                   >>  62            << "thread = " << m_thread_bin << ", "
                                                   >>  63            << "insert = " << m_insert_bin << ", "
                                                   >>  64            << "hold = " << m_hold->load() << " @ " << m_hold << ", "
                                                   >>  65            << "tasks = " << m_ntasks->load() << " @ " << m_ntasks << ", "
                                                   >>  66            << "subqueue = " << m_subqueues << ", "
                                                   >>  67            << "size = " << true_size() << ", "
                                                   >>  68            << "empty = " << true_empty();
                                                   >>  69         std::cout << ss.str() << std::endl;
                                                   >>  70     }
                                                   >>  71 #endif
 63 }                                                  72 }
 64                                                    73 
 65 //============================================     74 //======================================================================================//
 66                                                    75 
 67 UserTaskQueue::~UserTaskQueue()                    76 UserTaskQueue::~UserTaskQueue()
 68 {                                                  77 {
 69     if(!m_is_clone)                                78     if(!m_is_clone)
 70     {                                              79     {
 71         for(auto& itr : *m_subqueues)              80         for(auto& itr : *m_subqueues)
 72         {                                          81         {
 73             assert(itr->empty());                  82             assert(itr->empty());
 74             delete itr;                            83             delete itr;
 75         }                                          84         }
 76         m_subqueues->clear();                      85         m_subqueues->clear();
 77         delete m_hold;                             86         delete m_hold;
 78         delete m_ntasks;                           87         delete m_ntasks;
 79         delete m_mutex;                        << 
 80         delete m_subqueues;                        88         delete m_subqueues;
 81     }                                              89     }
 82 }                                                  90 }
 83                                                    91 
 84 //============================================     92 //======================================================================================//
 85                                                    93 
 86 void                                               94 void
 87 UserTaskQueue::resize(intmax_t n)                  95 UserTaskQueue::resize(intmax_t n)
 88 {                                                  96 {
 89     if(!m_mutex)                               <<  97     AutoLock l(m_mutex);
 90         throw std::runtime_error("nullptr to m << 
 91     AutoLock lk(m_mutex);                      << 
 92     if(m_workers < n)                              98     if(m_workers < n)
 93     {                                              99     {
 94         while(m_workers < n)                      100         while(m_workers < n)
 95         {                                         101         {
 96             m_subqueues->emplace_back(new Task    102             m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
 97             ++m_workers;                          103             ++m_workers;
 98         }                                         104         }
 99     }                                             105     }
100     else if(m_workers > n)                        106     else if(m_workers > n)
101     {                                             107     {
102         while(m_workers > n)                      108         while(m_workers > n)
103         {                                         109         {
104             delete m_subqueues->back();           110             delete m_subqueues->back();
105             m_subqueues->pop_back();              111             m_subqueues->pop_back();
106             --m_workers;                          112             --m_workers;
107         }                                         113         }
108     }                                             114     }
109 }                                                 115 }
110                                                   116 
111 //============================================    117 //======================================================================================//
112                                                   118 
113 VUserTaskQueue*                                   119 VUserTaskQueue*
114 UserTaskQueue::clone()                            120 UserTaskQueue::clone()
115 {                                                 121 {
116     return new UserTaskQueue(workers(), this);    122     return new UserTaskQueue(workers(), this);
117 }                                                 123 }
118 //============================================    124 //======================================================================================//
119                                                   125 
120 intmax_t                                          126 intmax_t
121 UserTaskQueue::GetThreadBin() const               127 UserTaskQueue::GetThreadBin() const
122 {                                                 128 {
123     // get a thread id number                     129     // get a thread id number
124     static thread_local intmax_t tl_bin =         130     static thread_local intmax_t tl_bin =
125         (m_thread_bin + ThreadPool::get_this_t    131         (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1);
126     return tl_bin;                                132     return tl_bin;
127 }                                                 133 }
128                                                   134 
129 //============================================    135 //======================================================================================//
130                                                   136 
131 intmax_t                                          137 intmax_t
132 UserTaskQueue::GetInsertBin() const               138 UserTaskQueue::GetInsertBin() const
133 {                                                 139 {
134     return (++m_insert_bin % (m_workers + 1));    140     return (++m_insert_bin % (m_workers + 1));
135 }                                                 141 }
136                                                   142 
137 //============================================    143 //======================================================================================//
138                                                   144 
139 UserTaskQueue::task_pointer                       145 UserTaskQueue::task_pointer
140 UserTaskQueue::GetThreadBinTask()                 146 UserTaskQueue::GetThreadBinTask()
141 {                                                 147 {
142     intmax_t      tbin      = GetThreadBin();     148     intmax_t      tbin      = GetThreadBin();
143     TaskSubQueue* task_subq = (*m_subqueues)[t    149     TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
144     task_pointer  _task     = nullptr;            150     task_pointer  _task     = nullptr;
145                                                   151 
146     //----------------------------------------    152     //------------------------------------------------------------------------//
147     auto get_task = [&]() {                       153     auto get_task = [&]() {
148         if(task_subq->AcquireClaim())             154         if(task_subq->AcquireClaim())
149         {                                         155         {
150             // run task                           156             // run task
151             _task = task_subq->PopTask(true);     157             _task = task_subq->PopTask(true);
152             // release the claim on the bin       158             // release the claim on the bin
153             task_subq->ReleaseClaim();            159             task_subq->ReleaseClaim();
154         }                                         160         }
155         if(_task)                                 161         if(_task)
156             --(*m_ntasks);                        162             --(*m_ntasks);
157         // return success if valid pointer        163         // return success if valid pointer
158         return (_task != nullptr);                164         return (_task != nullptr);
159     };                                            165     };
160     //----------------------------------------    166     //------------------------------------------------------------------------//
161                                                   167 
162     // while not empty                            168     // while not empty
163     while(!task_subq->empty())                    169     while(!task_subq->empty())
164     {                                             170     {
165         if(get_task())                            171         if(get_task())
166             break;                                172             break;
167     }                                             173     }
168     return _task;                                 174     return _task;
169 }                                                 175 }
170                                                   176 
171 //============================================    177 //======================================================================================//
172                                                   178 
173 UserTaskQueue::task_pointer                       179 UserTaskQueue::task_pointer
174 UserTaskQueue::GetTask(intmax_t subq, intmax_t    180 UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr)
175 {                                                 181 {
176     // exit if empty                              182     // exit if empty
177     if(this->true_empty())                        183     if(this->true_empty())
178         return nullptr;                           184         return nullptr;
179                                                   185 
180     // ensure the thread has a bin assignment     186     // ensure the thread has a bin assignment
181     intmax_t tbin = GetThreadBin();               187     intmax_t tbin = GetThreadBin();
182     intmax_t n    = (subq < 0) ? tbin : subq;     188     intmax_t n    = (subq < 0) ? tbin : subq;
183     if(nitr < 1)                                  189     if(nitr < 1)
184         nitr = (m_workers + 1);  // * m_ntasks    190         nitr = (m_workers + 1);  // * m_ntasks->load(std::memory_order_relaxed);
185                                                   191 
186     if(m_hold->load(std::memory_order_relaxed)    192     if(m_hold->load(std::memory_order_relaxed))
187     {                                             193     {
188         return GetThreadBinTask();                194         return GetThreadBinTask();
189     }                                             195     }
190                                                   196 
191     task_pointer _task = nullptr;                 197     task_pointer _task = nullptr;
192     //----------------------------------------    198     //------------------------------------------------------------------------//
193     auto get_task = [&](intmax_t _n) {            199     auto get_task = [&](intmax_t _n) {
194         TaskSubQueue* task_subq = (*m_subqueue    200         TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
195         // try to acquire a claim for the bin     201         // try to acquire a claim for the bin
196         // if acquired, no other threads will     202         // if acquired, no other threads will access bin until claim is released
197         if(!task_subq->empty() && task_subq->A    203         if(!task_subq->empty() && task_subq->AcquireClaim())
198         {                                         204         {
199             // pop task out of bin                205             // pop task out of bin
200             _task = task_subq->PopTask(n == tb    206             _task = task_subq->PopTask(n == tbin);
201             // release the claim on the bin       207             // release the claim on the bin
202             task_subq->ReleaseClaim();            208             task_subq->ReleaseClaim();
203         }                                         209         }
204         if(_task)                                 210         if(_task)
205             --(*m_ntasks);                        211             --(*m_ntasks);
206         // return success if valid pointer        212         // return success if valid pointer
207         return (_task != nullptr);                213         return (_task != nullptr);
208     };                                            214     };
209     //----------------------------------------    215     //------------------------------------------------------------------------//
210                                                   216 
211     // there are num_workers+1 bins so there i    217     // there are num_workers+1 bins so there is always a bin that is open
212     // execute num_workers+2 iterations so the    218     // execute num_workers+2 iterations so the thread checks its bin twice
213     // while(!empty())                            219     // while(!empty())
214     {                                             220     {
215         for(intmax_t i = 0; i < nitr; ++i, ++n    221         for(intmax_t i = 0; i < nitr; ++i, ++n)
216         {                                         222         {
217             if(get_task(n % (m_workers + 1)))     223             if(get_task(n % (m_workers + 1)))
218                 return _task;                     224                 return _task;
219         }                                         225         }
220     }                                             226     }
221                                                   227 
222     // only reached if looped over all bins (a    228     // only reached if looped over all bins (and looked in own bin twice)
223     // and found no work so return an empty ta    229     // and found no work so return an empty task and the thread will be put to
224     // sleep if there is still no work by the     230     // sleep if there is still no work by the time it reaches its
225     // condition variable                         231     // condition variable
226     return _task;                                 232     return _task;
227 }                                                 233 }
228                                                   234 
229 //============================================    235 //======================================================================================//
230                                                   236 
231 intmax_t                                          237 intmax_t
232 UserTaskQueue::InsertTask(task_pointer&& task,    238 UserTaskQueue::InsertTask(task_pointer&& task, ThreadData* data, intmax_t subq)
233 {                                                 239 {
234     // increment number of tasks                  240     // increment number of tasks
235     ++(*m_ntasks);                                241     ++(*m_ntasks);
236                                                   242 
237     bool     spin = m_hold->load(std::memory_o    243     bool     spin = m_hold->load(std::memory_order_relaxed);
238     intmax_t tbin = GetThreadBin();               244     intmax_t tbin = GetThreadBin();
239                                                   245 
240     if(data && data->within_task)                 246     if(data && data->within_task)
241     {                                             247     {
242         subq = tbin;                              248         subq = tbin;
243         // spin = true;                           249         // spin = true;
244     }                                             250     }
245                                                   251 
246     // subq is -1 unless specified so unless s    252     // subq is -1 unless specified so unless specified
247     // GetInsertBin() call increments a counte    253     // GetInsertBin() call increments a counter and returns
248     // counter % (num_workers + 1) so that tas    254     // counter % (num_workers + 1) so that tasks are distributed evenly
249     // among the bins                             255     // among the bins
250     intmax_t n = (subq < 0) ? GetInsertBin() :    256     intmax_t n = (subq < 0) ? GetInsertBin() : subq;
251                                                   257 
252     //----------------------------------------    258     //------------------------------------------------------------------------//
253     auto insert_task = [&](intmax_t _n) {         259     auto insert_task = [&](intmax_t _n) {
254         TaskSubQueue* task_subq = (*m_subqueue    260         TaskSubQueue* task_subq = (*m_subqueues)[_n];
255         // TaskSubQueue* next_subq = (*m_subqu    261         // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
256         // if not threads bin and size differe    262         // if not threads bin and size difference, insert into smaller
257         // if(n != tbin && next_subq->size() <    263         // if(n != tbin && next_subq->size() < task_subq->size())
258         //    task_subq = next_subq;              264         //    task_subq = next_subq;
259         // try to acquire a claim for the bin     265         // try to acquire a claim for the bin
260         // if acquired, no other threads will     266         // if acquired, no other threads will access bin until claim is released
261         if(task_subq->AcquireClaim())             267         if(task_subq->AcquireClaim())
262         {                                         268         {
263             // push the task into the bin         269             // push the task into the bin
264             task_subq->PushTask(std::move(task    270             task_subq->PushTask(std::move(task));
265             // release the claim on the bin       271             // release the claim on the bin
266             task_subq->ReleaseClaim();            272             task_subq->ReleaseClaim();
267             // return success                     273             // return success
268             return true;                          274             return true;
269         }                                         275         }
270         return false;                             276         return false;
271     };                                            277     };
272     //----------------------------------------    278     //------------------------------------------------------------------------//
273                                                   279 
274     // if not in "hold/spin mode", where threa    280     // if not in "hold/spin mode", where thread only inserts tasks into
275     // specified bin, then move onto next bin     281     // specified bin, then move onto next bin
276     //                                            282     //
277     if(spin)                                      283     if(spin)
278     {                                             284     {
279         n = n % (m_workers + 1);                  285         n = n % (m_workers + 1);
280         while(!insert_task(n))                    286         while(!insert_task(n))
281             ;                                     287             ;
282         return n;                                 288         return n;
283     }                                             289     }
284                                                   290 
285     // there are num_workers+1 bins so there i    291     // there are num_workers+1 bins so there is always a bin that is open
286     // execute num_workers+2 iterations so the    292     // execute num_workers+2 iterations so the thread checks its bin twice
287     while(true)                                   293     while(true)
288     {                                             294     {
289         auto _n = (n++) % (m_workers + 1);        295         auto _n = (n++) % (m_workers + 1);
290         if(insert_task(_n))                       296         if(insert_task(_n))
291             return _n;                            297             return _n;
292     }                                             298     }
                                                   >> 299     return GetThreadBin();
293 }                                                 300 }
294                                                   301 
295 //============================================    302 //======================================================================================//
296                                                   303 
297 void                                              304 void
298 UserTaskQueue::ExecuteOnAllThreads(ThreadPool*    305 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* tp, function_type func)
299 {                                                 306 {
300     using task_group_type      = TaskGroup<int    307     using task_group_type      = TaskGroup<int, int>;
301     using thread_execute_map_t = std::map<int6    308     using thread_execute_map_t = std::map<int64_t, bool>;
302                                                   309 
303     if(!tp->is_alive())                           310     if(!tp->is_alive())
304     {                                             311     {
305         func();                                   312         func();
306         return;                                   313         return;
307     }                                             314     }
308                                                   315 
309     task_group_type tg{ [](int& ref, int i) {     316     task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
310                                                   317 
311     // wait for all threads to finish any work    318     // wait for all threads to finish any work
312     // NOTE: will cause deadlock if called fro    319     // NOTE: will cause deadlock if called from a task
313     while(tp->get_active_threads_count() > 0)     320     while(tp->get_active_threads_count() > 0)
314         ThisThread::sleep_for(std::chrono::mil    321         ThisThread::sleep_for(std::chrono::milliseconds(10));
315                                                   322 
316     thread_execute_map_t                thread    323     thread_execute_map_t                thread_execute_map{};
317     std::vector<std::shared_ptr<VTask>> _tasks    324     std::vector<std::shared_ptr<VTask>> _tasks{};
318     _tasks.reserve(m_workers + 1);                325     _tasks.reserve(m_workers + 1);
319                                                   326 
320     AcquireHold();                                327     AcquireHold();
321     for(int i = 0; i < (m_workers + 1); ++i)      328     for(int i = 0; i < (m_workers + 1); ++i)
322     {                                             329     {
323         if(i == GetThreadBin())                   330         if(i == GetThreadBin())
324             continue;                             331             continue;
325                                                   332 
326         //------------------------------------    333         //--------------------------------------------------------------------//
327         auto thread_specific_func = [&]() {       334         auto thread_specific_func = [&]() {
328             ScopeDestructor _dtor = tg.get_sco    335             ScopeDestructor _dtor = tg.get_scope_destructor();
329             static Mutex    _mtx;                 336             static Mutex    _mtx;
330             _mtx.lock();                          337             _mtx.lock();
331             bool& _executed = thread_execute_m    338             bool& _executed = thread_execute_map[GetThreadBin()];
332             _mtx.unlock();                        339             _mtx.unlock();
333             if(!_executed)                        340             if(!_executed)
334             {                                     341             {
335                 func();                           342                 func();
336                 _executed = true;                 343                 _executed = true;
337                 return 1;                         344                 return 1;
338             }                                     345             }
339             return 0;                             346             return 0;
340         };                                        347         };
341         //------------------------------------    348         //--------------------------------------------------------------------//
342                                                   349 
343         InsertTask(tg.wrap(thread_specific_fun    350         InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
344     }                                             351     }
345                                                   352 
346     tp->notify_all();                             353     tp->notify_all();
347     int nexecuted = tg.join();                    354     int nexecuted = tg.join();
348     if(nexecuted != m_workers)                    355     if(nexecuted != m_workers)
349     {                                             356     {
350         std::stringstream msg;                    357         std::stringstream msg;
351         msg << "Failure executing routine on a    358         msg << "Failure executing routine on all threads! Only " << nexecuted
352             << " threads executed function out    359             << " threads executed function out of " << m_workers << " workers";
353         std::cerr << msg.str() << std::endl;      360         std::cerr << msg.str() << std::endl;
354     }                                             361     }
355     ReleaseHold();                                362     ReleaseHold();
356 }                                                 363 }
357                                                   364 
358 //============================================    365 //======================================================================================//
359                                                   366 
360 void                                              367 void
361 UserTaskQueue::ExecuteOnSpecificThreads(Thread    368 UserTaskQueue::ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool* tp,
362                                         functi    369                                         function_type func)
363 {                                                 370 {
364     using task_group_type      = TaskGroup<int    371     using task_group_type      = TaskGroup<int, int>;
365     using thread_execute_map_t = std::map<int6    372     using thread_execute_map_t = std::map<int64_t, bool>;
366                                                   373 
367     task_group_type tg{ [](int& ref, int i) {     374     task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
368                                                   375 
369     // wait for all threads to finish any work    376     // wait for all threads to finish any work
370     // NOTE: will cause deadlock if called fro    377     // NOTE: will cause deadlock if called from a task
371     while(tp->get_active_threads_count() > 0)     378     while(tp->get_active_threads_count() > 0)
372         ThisThread::sleep_for(std::chrono::mil    379         ThisThread::sleep_for(std::chrono::milliseconds(10));
373                                                   380 
374     if(!tp->is_alive())                           381     if(!tp->is_alive())
375     {                                             382     {
376         func();                                   383         func();
377         return;                                   384         return;
378     }                                             385     }
379                                                   386 
380     thread_execute_map_t thread_execute_map{};    387     thread_execute_map_t thread_execute_map{};
381                                                   388 
382     //========================================    389     //========================================================================//
383     // wrap the function so that it will only     390     // wrap the function so that it will only be executed if the thread
384     // has an ID in the set                       391     // has an ID in the set
385     auto thread_specific_func = [&]() {           392     auto thread_specific_func = [&]() {
386         ScopeDestructor _dtor = tg.get_scope_d    393         ScopeDestructor _dtor = tg.get_scope_destructor();
387         static Mutex    _mtx;                     394         static Mutex    _mtx;
388         _mtx.lock();                              395         _mtx.lock();
389         bool& _executed = thread_execute_map[G    396         bool& _executed = thread_execute_map[GetThreadBin()];
390         _mtx.unlock();                            397         _mtx.unlock();
391         if(!_executed && tid_set.count(ThisThr    398         if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
392         {                                         399         {
393             func();                               400             func();
394             _executed = true;                     401             _executed = true;
395             return 1;                             402             return 1;
396         }                                         403         }
397         return 0;                                 404         return 0;
398     };                                            405     };
399     //========================================    406     //========================================================================//
400                                                   407 
401     if(tid_set.count(ThisThread::get_id()) > 0    408     if(tid_set.count(ThisThread::get_id()) > 0)
402         func();                                   409         func();
403                                                   410 
404     AcquireHold();                                411     AcquireHold();
405     for(int i = 0; i < (m_workers + 1); ++i)      412     for(int i = 0; i < (m_workers + 1); ++i)
406     {                                             413     {
407         if(i == GetThreadBin())                   414         if(i == GetThreadBin())
408             continue;                             415             continue;
409                                                   416 
410         InsertTask(tg.wrap(thread_specific_fun    417         InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
411     }                                             418     }
412     tp->notify_all();                             419     tp->notify_all();
413     decltype(tid_set.size()) nexecuted = tg.jo    420     decltype(tid_set.size()) nexecuted = tg.join();
414     if(nexecuted != tid_set.size())               421     if(nexecuted != tid_set.size())
415     {                                             422     {
416         std::stringstream msg;                    423         std::stringstream msg;
417         msg << "Failure executing routine on s    424         msg << "Failure executing routine on specific threads! Only " << nexecuted
418             << " threads executed function out    425             << " threads executed function out of " << tid_set.size() << " workers";
419         std::cerr << msg.str() << std::endl;      426         std::cerr << msg.str() << std::endl;
420     }                                             427     }
421     ReleaseHold();                                428     ReleaseHold();
422 }                                                 429 }
423                                                   430 
424 //============================================    431 //======================================================================================//
425                                                   432 
426 void                                              433 void
427 UserTaskQueue::AcquireHold()                      434 UserTaskQueue::AcquireHold()
428 {                                                 435 {
429     bool _hold;                                   436     bool _hold;
430     while(!(_hold = m_hold->load(std::memory_o    437     while(!(_hold = m_hold->load(std::memory_order_relaxed)))
431     {                                             438     {
432         m_hold->compare_exchange_strong(_hold,    439         m_hold->compare_exchange_strong(_hold, true, std::memory_order_release,
433                                         std::m    440                                         std::memory_order_relaxed);
434     }                                             441     }
435 }                                                 442 }
436                                                   443 
437 //============================================    444 //======================================================================================//
438                                                   445 
439 void                                              446 void
440 UserTaskQueue::ReleaseHold()                      447 UserTaskQueue::ReleaseHold()
441 {                                                 448 {
442     bool _hold;                                   449     bool _hold;
443     while((_hold = m_hold->load(std::memory_or    450     while((_hold = m_hold->load(std::memory_order_relaxed)))
444     {                                             451     {
445         m_hold->compare_exchange_strong(_hold,    452         m_hold->compare_exchange_strong(_hold, false, std::memory_order_release,
446                                         std::m    453                                         std::memory_order_relaxed);
447     }                                             454     }
448 }                                                 455 }
449                                                   456 
450 //============================================    457 //======================================================================================//
451                                                << 
452 }  // namespace PTL                            << 
453                                                   458