Geant4 Cross Reference

Cross-Referencing   Geant4
Geant4/externals/ptl/src/UserTaskQueue.cc

Version: [ ReleaseNotes ] [ 1.0 ] [ 1.1 ] [ 2.0 ] [ 3.0 ] [ 3.1 ] [ 3.2 ] [ 4.0 ] [ 4.0.p1 ] [ 4.0.p2 ] [ 4.1 ] [ 4.1.p1 ] [ 5.0 ] [ 5.0.p1 ] [ 5.1 ] [ 5.1.p1 ] [ 5.2 ] [ 5.2.p1 ] [ 5.2.p2 ] [ 6.0 ] [ 6.0.p1 ] [ 6.1 ] [ 6.2 ] [ 6.2.p1 ] [ 6.2.p2 ] [ 7.0 ] [ 7.0.p1 ] [ 7.1 ] [ 7.1.p1 ] [ 8.0 ] [ 8.0.p1 ] [ 8.1 ] [ 8.1.p1 ] [ 8.1.p2 ] [ 8.2 ] [ 8.2.p1 ] [ 8.3 ] [ 8.3.p1 ] [ 8.3.p2 ] [ 9.0 ] [ 9.0.p1 ] [ 9.0.p2 ] [ 9.1 ] [ 9.1.p1 ] [ 9.1.p2 ] [ 9.1.p3 ] [ 9.2 ] [ 9.2.p1 ] [ 9.2.p2 ] [ 9.2.p3 ] [ 9.2.p4 ] [ 9.3 ] [ 9.3.p1 ] [ 9.3.p2 ] [ 9.4 ] [ 9.4.p1 ] [ 9.4.p2 ] [ 9.4.p3 ] [ 9.4.p4 ] [ 9.5 ] [ 9.5.p1 ] [ 9.5.p2 ] [ 9.6 ] [ 9.6.p1 ] [ 9.6.p2 ] [ 9.6.p3 ] [ 9.6.p4 ] [ 10.0 ] [ 10.0.p1 ] [ 10.0.p2 ] [ 10.0.p3 ] [ 10.0.p4 ] [ 10.1 ] [ 10.1.p1 ] [ 10.1.p2 ] [ 10.1.p3 ] [ 10.2 ] [ 10.2.p1 ] [ 10.2.p2 ] [ 10.2.p3 ] [ 10.3 ] [ 10.3.p1 ] [ 10.3.p2 ] [ 10.3.p3 ] [ 10.4 ] [ 10.4.p1 ] [ 10.4.p2 ] [ 10.4.p3 ] [ 10.5 ] [ 10.5.p1 ] [ 10.6 ] [ 10.6.p1 ] [ 10.6.p2 ] [ 10.6.p3 ] [ 10.7 ] [ 10.7.p1 ] [ 10.7.p2 ] [ 10.7.p3 ] [ 10.7.p4 ] [ 11.0 ] [ 11.0.p1 ] [ 11.0.p2 ] [ 11.0.p3, ] [ 11.0.p4 ] [ 11.1 ] [ 11.1.1 ] [ 11.1.2 ] [ 11.1.3 ] [ 11.2 ] [ 11.2.1 ] [ 11.2.2 ] [ 11.3.0 ]

  1 //
  2 // MIT License
  3 // Copyright (c) 2020 Jonathan R. Madsen
  4 // Permission is hereby granted, free of charge, to any person obtaining a copy
  5 // of this software and associated documentation files (the "Software"), to deal
  6 // in the Software without restriction, including without limitation the rights
  7 // to use, copy, modify, merge, publish, distribute, sublicense, and
  8 // copies of the Software, and to permit persons to whom the Software is
  9 // furnished to do so, subject to the following conditions:
 10 // The above copyright notice and this permission notice shall be included in
 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 18 //
 19 // ---------------------------------------------------------------
 20 //  Tasking class implementation
 21 //  Class Description:
 22 //  ---------------------------------------------------------------
 23 //  Author: Jonathan Madsen
 24 //  ---------------------------------------------------------------
 25 
 26 #include "PTL/UserTaskQueue.hh"
 27 
 28 #include "PTL/AutoLock.hh"
 29 #include "PTL/ScopeDestructor.hh"
 30 #include "PTL/TaskGroup.hh"
 31 #include "PTL/ThreadData.hh"
 32 #include "PTL/ThreadPool.hh"
 33 
 34 #include <cassert>
 35 #include <chrono>
 36 #include <functional>
 37 #include <iostream>
 38 #include <map>
 39 #include <stdexcept>
 40 #include <thread>
 41 #include <utility>
 42 
 43 namespace PTL
 44 {
 45 //======================================================================================//
 46 
 47 UserTaskQueue::UserTaskQueue(intmax_t nworkers, UserTaskQueue* parent)
 48 : VUserTaskQueue(nworkers)
 49 , m_is_clone((parent) != nullptr)
 50 , m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
 51 , m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
 52 , m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
 53 , m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
 54 , m_mutex((parent) ? parent->m_mutex : new Mutex{})
 55 , m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
 56 {
 57     // create nthreads + 1 subqueues so there is always a subqueue available
 58     if(!parent)
 59     {
 60         for(intmax_t i = 0; i < nworkers + 1; ++i)
 61             m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
 62     }
 63 }
 64 
 65 //======================================================================================//
 66 
 67 UserTaskQueue::~UserTaskQueue()
 68 {
 69     if(!m_is_clone)
 70     {
 71         for(auto& itr : *m_subqueues)
 72         {
 73             assert(itr->empty());
 74             delete itr;
 75         }
 76         m_subqueues->clear();
 77         delete m_hold;
 78         delete m_ntasks;
 79         delete m_mutex;
 80         delete m_subqueues;
 81     }
 82 }
 83 
 84 //======================================================================================//
 85 
 86 void
 87 UserTaskQueue::resize(intmax_t n)
 88 {
 89     if(!m_mutex)
 90         throw std::runtime_error("nullptr to mutex");
 91     AutoLock lk(m_mutex);
 92     if(m_workers < n)
 93     {
 94         while(m_workers < n)
 95         {
 96             m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
 97             ++m_workers;
 98         }
 99     }
100     else if(m_workers > n)
101     {
102         while(m_workers > n)
103         {
104             delete m_subqueues->back();
105             m_subqueues->pop_back();
106             --m_workers;
107         }
108     }
109 }
110 
111 //======================================================================================//
112 
113 VUserTaskQueue*
114 UserTaskQueue::clone()
115 {
116     return new UserTaskQueue(workers(), this);
117 }
118 //======================================================================================//
119 
120 intmax_t
121 UserTaskQueue::GetThreadBin() const
122 {
123     // get a thread id number
124     static thread_local intmax_t tl_bin =
125         (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1);
126     return tl_bin;
127 }
128 
129 //======================================================================================//
130 
131 intmax_t
132 UserTaskQueue::GetInsertBin() const
133 {
134     return (++m_insert_bin % (m_workers + 1));
135 }
136 
137 //======================================================================================//
138 
139 UserTaskQueue::task_pointer
140 UserTaskQueue::GetThreadBinTask()
141 {
142     intmax_t      tbin      = GetThreadBin();
143     TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
144     task_pointer  _task     = nullptr;
145 
146     //------------------------------------------------------------------------//
147     auto get_task = [&]() {
148         if(task_subq->AcquireClaim())
149         {
150             // run task
151             _task = task_subq->PopTask(true);
152             // release the claim on the bin
153             task_subq->ReleaseClaim();
154         }
155         if(_task)
156             --(*m_ntasks);
157         // return success if valid pointer
158         return (_task != nullptr);
159     };
160     //------------------------------------------------------------------------//
161 
162     // while not empty
163     while(!task_subq->empty())
164     {
165         if(get_task())
166             break;
167     }
168     return _task;
169 }
170 
171 //======================================================================================//
172 
173 UserTaskQueue::task_pointer
174 UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr)
175 {
176     // exit if empty
177     if(this->true_empty())
178         return nullptr;
179 
180     // ensure the thread has a bin assignment
181     intmax_t tbin = GetThreadBin();
182     intmax_t n    = (subq < 0) ? tbin : subq;
183     if(nitr < 1)
184         nitr = (m_workers + 1);  // * m_ntasks->load(std::memory_order_relaxed);
185 
186     if(m_hold->load(std::memory_order_relaxed))
187     {
188         return GetThreadBinTask();
189     }
190 
191     task_pointer _task = nullptr;
192     //------------------------------------------------------------------------//
193     auto get_task = [&](intmax_t _n) {
194         TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
195         // try to acquire a claim for the bin
196         // if acquired, no other threads will access bin until claim is released
197         if(!task_subq->empty() && task_subq->AcquireClaim())
198         {
199             // pop task out of bin
200             _task = task_subq->PopTask(n == tbin);
201             // release the claim on the bin
202             task_subq->ReleaseClaim();
203         }
204         if(_task)
205             --(*m_ntasks);
206         // return success if valid pointer
207         return (_task != nullptr);
208     };
209     //------------------------------------------------------------------------//
210 
211     // there are num_workers+1 bins so there is always a bin that is open
212     // execute num_workers+2 iterations so the thread checks its bin twice
213     // while(!empty())
214     {
215         for(intmax_t i = 0; i < nitr; ++i, ++n)
216         {
217             if(get_task(n % (m_workers + 1)))
218                 return _task;
219         }
220     }
221 
222     // only reached if looped over all bins (and looked in own bin twice)
223     // and found no work so return an empty task and the thread will be put to
224     // sleep if there is still no work by the time it reaches its
225     // condition variable
226     return _task;
227 }
228 
229 //======================================================================================//
230 
231 intmax_t
232 UserTaskQueue::InsertTask(task_pointer&& task, ThreadData* data, intmax_t subq)
233 {
234     // increment number of tasks
235     ++(*m_ntasks);
236 
237     bool     spin = m_hold->load(std::memory_order_relaxed);
238     intmax_t tbin = GetThreadBin();
239 
240     if(data && data->within_task)
241     {
242         subq = tbin;
243         // spin = true;
244     }
245 
246     // subq is -1 unless specified so unless specified
247     // GetInsertBin() call increments a counter and returns
248     // counter % (num_workers + 1) so that tasks are distributed evenly
249     // among the bins
250     intmax_t n = (subq < 0) ? GetInsertBin() : subq;
251 
252     //------------------------------------------------------------------------//
253     auto insert_task = [&](intmax_t _n) {
254         TaskSubQueue* task_subq = (*m_subqueues)[_n];
255         // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
256         // if not threads bin and size difference, insert into smaller
257         // if(n != tbin && next_subq->size() < task_subq->size())
258         //    task_subq = next_subq;
259         // try to acquire a claim for the bin
260         // if acquired, no other threads will access bin until claim is released
261         if(task_subq->AcquireClaim())
262         {
263             // push the task into the bin
264             task_subq->PushTask(std::move(task));
265             // release the claim on the bin
266             task_subq->ReleaseClaim();
267             // return success
268             return true;
269         }
270         return false;
271     };
272     //------------------------------------------------------------------------//
273 
274     // if not in "hold/spin mode", where thread only inserts tasks into
275     // specified bin, then move onto next bin
276     //
277     if(spin)
278     {
279         n = n % (m_workers + 1);
280         while(!insert_task(n))
281             ;
282         return n;
283     }
284 
285     // there are num_workers+1 bins so there is always a bin that is open
286     // execute num_workers+2 iterations so the thread checks its bin twice
287     while(true)
288     {
289         auto _n = (n++) % (m_workers + 1);
290         if(insert_task(_n))
291             return _n;
292     }
293 }
294 
295 //======================================================================================//
296 
297 void
298 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* tp, function_type func)
299 {
300     using task_group_type      = TaskGroup<int, int>;
301     using thread_execute_map_t = std::map<int64_t, bool>;
302 
303     if(!tp->is_alive())
304     {
305         func();
306         return;
307     }
308 
309     task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
310 
311     // wait for all threads to finish any work
312     // NOTE: will cause deadlock if called from a task
313     while(tp->get_active_threads_count() > 0)
314         ThisThread::sleep_for(std::chrono::milliseconds(10));
315 
316     thread_execute_map_t                thread_execute_map{};
317     std::vector<std::shared_ptr<VTask>> _tasks{};
318     _tasks.reserve(m_workers + 1);
319 
320     AcquireHold();
321     for(int i = 0; i < (m_workers + 1); ++i)
322     {
323         if(i == GetThreadBin())
324             continue;
325 
326         //--------------------------------------------------------------------//
327         auto thread_specific_func = [&]() {
328             ScopeDestructor _dtor = tg.get_scope_destructor();
329             static Mutex    _mtx;
330             _mtx.lock();
331             bool& _executed = thread_execute_map[GetThreadBin()];
332             _mtx.unlock();
333             if(!_executed)
334             {
335                 func();
336                 _executed = true;
337                 return 1;
338             }
339             return 0;
340         };
341         //--------------------------------------------------------------------//
342 
343         InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
344     }
345 
346     tp->notify_all();
347     int nexecuted = tg.join();
348     if(nexecuted != m_workers)
349     {
350         std::stringstream msg;
351         msg << "Failure executing routine on all threads! Only " << nexecuted
352             << " threads executed function out of " << m_workers << " workers";
353         std::cerr << msg.str() << std::endl;
354     }
355     ReleaseHold();
356 }
357 
358 //======================================================================================//
359 
360 void
361 UserTaskQueue::ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool* tp,
362                                         function_type func)
363 {
364     using task_group_type      = TaskGroup<int, int>;
365     using thread_execute_map_t = std::map<int64_t, bool>;
366 
367     task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
368 
369     // wait for all threads to finish any work
370     // NOTE: will cause deadlock if called from a task
371     while(tp->get_active_threads_count() > 0)
372         ThisThread::sleep_for(std::chrono::milliseconds(10));
373 
374     if(!tp->is_alive())
375     {
376         func();
377         return;
378     }
379 
380     thread_execute_map_t thread_execute_map{};
381 
382     //========================================================================//
383     // wrap the function so that it will only be executed if the thread
384     // has an ID in the set
385     auto thread_specific_func = [&]() {
386         ScopeDestructor _dtor = tg.get_scope_destructor();
387         static Mutex    _mtx;
388         _mtx.lock();
389         bool& _executed = thread_execute_map[GetThreadBin()];
390         _mtx.unlock();
391         if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
392         {
393             func();
394             _executed = true;
395             return 1;
396         }
397         return 0;
398     };
399     //========================================================================//
400 
401     if(tid_set.count(ThisThread::get_id()) > 0)
402         func();
403 
404     AcquireHold();
405     for(int i = 0; i < (m_workers + 1); ++i)
406     {
407         if(i == GetThreadBin())
408             continue;
409 
410         InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
411     }
412     tp->notify_all();
413     decltype(tid_set.size()) nexecuted = tg.join();
414     if(nexecuted != tid_set.size())
415     {
416         std::stringstream msg;
417         msg << "Failure executing routine on specific threads! Only " << nexecuted
418             << " threads executed function out of " << tid_set.size() << " workers";
419         std::cerr << msg.str() << std::endl;
420     }
421     ReleaseHold();
422 }
423 
424 //======================================================================================//
425 
426 void
427 UserTaskQueue::AcquireHold()
428 {
429     bool _hold;
430     while(!(_hold = m_hold->load(std::memory_order_relaxed)))
431     {
432         m_hold->compare_exchange_strong(_hold, true, std::memory_order_release,
433                                         std::memory_order_relaxed);
434     }
435 }
436 
437 //======================================================================================//
438 
439 void
440 UserTaskQueue::ReleaseHold()
441 {
442     bool _hold;
443     while((_hold = m_hold->load(std::memory_order_relaxed)))
444     {
445         m_hold->compare_exchange_strong(_hold, false, std::memory_order_release,
446                                         std::memory_order_relaxed);
447     }
448 }
449 
450 //======================================================================================//
451 
452 }  // namespace PTL
453