Geant4 Cross Reference

Cross-Referencing   Geant4
Geant4/externals/ptl/src/UserTaskQueue.cc

Version: [ ReleaseNotes ] [ 1.0 ] [ 1.1 ] [ 2.0 ] [ 3.0 ] [ 3.1 ] [ 3.2 ] [ 4.0 ] [ 4.0.p1 ] [ 4.0.p2 ] [ 4.1 ] [ 4.1.p1 ] [ 5.0 ] [ 5.0.p1 ] [ 5.1 ] [ 5.1.p1 ] [ 5.2 ] [ 5.2.p1 ] [ 5.2.p2 ] [ 6.0 ] [ 6.0.p1 ] [ 6.1 ] [ 6.2 ] [ 6.2.p1 ] [ 6.2.p2 ] [ 7.0 ] [ 7.0.p1 ] [ 7.1 ] [ 7.1.p1 ] [ 8.0 ] [ 8.0.p1 ] [ 8.1 ] [ 8.1.p1 ] [ 8.1.p2 ] [ 8.2 ] [ 8.2.p1 ] [ 8.3 ] [ 8.3.p1 ] [ 8.3.p2 ] [ 9.0 ] [ 9.0.p1 ] [ 9.0.p2 ] [ 9.1 ] [ 9.1.p1 ] [ 9.1.p2 ] [ 9.1.p3 ] [ 9.2 ] [ 9.2.p1 ] [ 9.2.p2 ] [ 9.2.p3 ] [ 9.2.p4 ] [ 9.3 ] [ 9.3.p1 ] [ 9.3.p2 ] [ 9.4 ] [ 9.4.p1 ] [ 9.4.p2 ] [ 9.4.p3 ] [ 9.4.p4 ] [ 9.5 ] [ 9.5.p1 ] [ 9.5.p2 ] [ 9.6 ] [ 9.6.p1 ] [ 9.6.p2 ] [ 9.6.p3 ] [ 9.6.p4 ] [ 10.0 ] [ 10.0.p1 ] [ 10.0.p2 ] [ 10.0.p3 ] [ 10.0.p4 ] [ 10.1 ] [ 10.1.p1 ] [ 10.1.p2 ] [ 10.1.p3 ] [ 10.2 ] [ 10.2.p1 ] [ 10.2.p2 ] [ 10.2.p3 ] [ 10.3 ] [ 10.3.p1 ] [ 10.3.p2 ] [ 10.3.p3 ] [ 10.4 ] [ 10.4.p1 ] [ 10.4.p2 ] [ 10.4.p3 ] [ 10.5 ] [ 10.5.p1 ] [ 10.6 ] [ 10.6.p1 ] [ 10.6.p2 ] [ 10.6.p3 ] [ 10.7 ] [ 10.7.p1 ] [ 10.7.p2 ] [ 10.7.p3 ] [ 10.7.p4 ] [ 11.0 ] [ 11.0.p1 ] [ 11.0.p2 ] [ 11.0.p3, ] [ 11.0.p4 ] [ 11.1 ] [ 11.1.1 ] [ 11.1.2 ] [ 11.1.3 ] [ 11.2 ] [ 11.2.1 ] [ 11.2.2 ] [ 11.3.0 ]

Diff markup

Differences between /externals/ptl/src/UserTaskQueue.cc (Version 11.3.0) and /externals/ptl/src/UserTaskQueue.cc (Version 9.1.p1)


  1 //                                                  1 
  2 // MIT License                                    
  3 // Copyright (c) 2020 Jonathan R. Madsen          
  4 // Permission is hereby granted, free of charg    
  5 // of this software and associated documentati    
  6 // in the Software without restriction, includ    
  7 // to use, copy, modify, merge, publish, distr    
  8 // copies of the Software, and to permit perso    
  9 // furnished to do so, subject to the followin    
 10 // The above copyright notice and this permiss    
 11 // all copies or substantial portions of the S    
 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR    
 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT    
 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH    
 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR    
 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS    
 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI    
 18 //                                                
 19 // -------------------------------------------    
 20 //  Tasking class implementation                  
 21 //  Class Description:                            
 22 //  ------------------------------------------    
 23 //  Author: Jonathan Madsen                       
 24 //  ------------------------------------------    
 25                                                   
 26 #include "PTL/UserTaskQueue.hh"                   
 27                                                   
 28 #include "PTL/AutoLock.hh"                        
 29 #include "PTL/ScopeDestructor.hh"                 
 30 #include "PTL/TaskGroup.hh"                       
 31 #include "PTL/ThreadData.hh"                      
 32 #include "PTL/ThreadPool.hh"                      
 33                                                   
 34 #include <cassert>                                
 35 #include <chrono>                                 
 36 #include <functional>                             
 37 #include <iostream>                               
 38 #include <map>                                    
 39 #include <stdexcept>                              
 40 #include <thread>                                 
 41 #include <utility>                                
 42                                                   
 43 namespace PTL                                     
 44 {                                                 
 45 //============================================    
 46                                                   
 47 UserTaskQueue::UserTaskQueue(intmax_t nworkers    
 48 : VUserTaskQueue(nworkers)                        
 49 , m_is_clone((parent) != nullptr)                 
 50 , m_thread_bin((parent) ? (ThreadPool::get_thi    
 51 , m_insert_bin((parent) ? (ThreadPool::get_thi    
 52 , m_hold((parent) ? parent->m_hold : new std::    
 53 , m_ntasks((parent) ? parent->m_ntasks : new s    
 54 , m_mutex((parent) ? parent->m_mutex : new Mut    
 55 , m_subqueues((parent) ? parent->m_subqueues :    
 56 {                                                 
 57     // create nthreads + 1 subqueues so there     
 58     if(!parent)                                   
 59     {                                             
 60         for(intmax_t i = 0; i < nworkers + 1;     
 61             m_subqueues->emplace_back(new Task    
 62     }                                             
 63 }                                                 
 64                                                   
 65 //============================================    
 66                                                   
 67 UserTaskQueue::~UserTaskQueue()                   
 68 {                                                 
 69     if(!m_is_clone)                               
 70     {                                             
 71         for(auto& itr : *m_subqueues)             
 72         {                                         
 73             assert(itr->empty());                 
 74             delete itr;                           
 75         }                                         
 76         m_subqueues->clear();                     
 77         delete m_hold;                            
 78         delete m_ntasks;                          
 79         delete m_mutex;                           
 80         delete m_subqueues;                       
 81     }                                             
 82 }                                                 
 83                                                   
 84 //============================================    
 85                                                   
 86 void                                              
 87 UserTaskQueue::resize(intmax_t n)                 
 88 {                                                 
 89     if(!m_mutex)                                  
 90         throw std::runtime_error("nullptr to m    
 91     AutoLock lk(m_mutex);                         
 92     if(m_workers < n)                             
 93     {                                             
 94         while(m_workers < n)                      
 95         {                                         
 96             m_subqueues->emplace_back(new Task    
 97             ++m_workers;                          
 98         }                                         
 99     }                                             
100     else if(m_workers > n)                        
101     {                                             
102         while(m_workers > n)                      
103         {                                         
104             delete m_subqueues->back();           
105             m_subqueues->pop_back();              
106             --m_workers;                          
107         }                                         
108     }                                             
109 }                                                 
110                                                   
111 //============================================    
112                                                   
113 VUserTaskQueue*                                   
114 UserTaskQueue::clone()                            
115 {                                                 
116     return new UserTaskQueue(workers(), this);    
117 }                                                 
118 //============================================    
119                                                   
120 intmax_t                                          
121 UserTaskQueue::GetThreadBin() const               
122 {                                                 
123     // get a thread id number                     
124     static thread_local intmax_t tl_bin =         
125         (m_thread_bin + ThreadPool::get_this_t    
126     return tl_bin;                                
127 }                                                 
128                                                   
129 //============================================    
130                                                   
131 intmax_t                                          
132 UserTaskQueue::GetInsertBin() const               
133 {                                                 
134     return (++m_insert_bin % (m_workers + 1));    
135 }                                                 
136                                                   
137 //============================================    
138                                                   
139 UserTaskQueue::task_pointer                       
140 UserTaskQueue::GetThreadBinTask()                 
141 {                                                 
142     intmax_t      tbin      = GetThreadBin();     
143     TaskSubQueue* task_subq = (*m_subqueues)[t    
144     task_pointer  _task     = nullptr;            
145                                                   
146     //----------------------------------------    
147     auto get_task = [&]() {                       
148         if(task_subq->AcquireClaim())             
149         {                                         
150             // run task                           
151             _task = task_subq->PopTask(true);     
152             // release the claim on the bin       
153             task_subq->ReleaseClaim();            
154         }                                         
155         if(_task)                                 
156             --(*m_ntasks);                        
157         // return success if valid pointer        
158         return (_task != nullptr);                
159     };                                            
160     //----------------------------------------    
161                                                   
162     // while not empty                            
163     while(!task_subq->empty())                    
164     {                                             
165         if(get_task())                            
166             break;                                
167     }                                             
168     return _task;                                 
169 }                                                 
170                                                   
171 //============================================    
172                                                   
173 UserTaskQueue::task_pointer                       
174 UserTaskQueue::GetTask(intmax_t subq, intmax_t    
175 {                                                 
176     // exit if empty                              
177     if(this->true_empty())                        
178         return nullptr;                           
179                                                   
180     // ensure the thread has a bin assignment     
181     intmax_t tbin = GetThreadBin();               
182     intmax_t n    = (subq < 0) ? tbin : subq;     
183     if(nitr < 1)                                  
184         nitr = (m_workers + 1);  // * m_ntasks    
185                                                   
186     if(m_hold->load(std::memory_order_relaxed)    
187     {                                             
188         return GetThreadBinTask();                
189     }                                             
190                                                   
191     task_pointer _task = nullptr;                 
192     //----------------------------------------    
193     auto get_task = [&](intmax_t _n) {            
194         TaskSubQueue* task_subq = (*m_subqueue    
195         // try to acquire a claim for the bin     
196         // if acquired, no other threads will     
197         if(!task_subq->empty() && task_subq->A    
198         {                                         
199             // pop task out of bin                
200             _task = task_subq->PopTask(n == tb    
201             // release the claim on the bin       
202             task_subq->ReleaseClaim();            
203         }                                         
204         if(_task)                                 
205             --(*m_ntasks);                        
206         // return success if valid pointer        
207         return (_task != nullptr);                
208     };                                            
209     //----------------------------------------    
210                                                   
211     // there are num_workers+1 bins so there i    
212     // execute num_workers+2 iterations so the    
213     // while(!empty())                            
214     {                                             
215         for(intmax_t i = 0; i < nitr; ++i, ++n    
216         {                                         
217             if(get_task(n % (m_workers + 1)))     
218                 return _task;                     
219         }                                         
220     }                                             
221                                                   
222     // only reached if looped over all bins (a    
223     // and found no work so return an empty ta    
224     // sleep if there is still no work by the     
225     // condition variable                         
226     return _task;                                 
227 }                                                 
228                                                   
229 //============================================    
230                                                   
231 intmax_t                                          
232 UserTaskQueue::InsertTask(task_pointer&& task,    
233 {                                                 
234     // increment number of tasks                  
235     ++(*m_ntasks);                                
236                                                   
237     bool     spin = m_hold->load(std::memory_o    
238     intmax_t tbin = GetThreadBin();               
239                                                   
240     if(data && data->within_task)                 
241     {                                             
242         subq = tbin;                              
243         // spin = true;                           
244     }                                             
245                                                   
246     // subq is -1 unless specified so unless s    
247     // GetInsertBin() call increments a counte    
248     // counter % (num_workers + 1) so that tas    
249     // among the bins                             
250     intmax_t n = (subq < 0) ? GetInsertBin() :    
251                                                   
252     //----------------------------------------    
253     auto insert_task = [&](intmax_t _n) {         
254         TaskSubQueue* task_subq = (*m_subqueue    
255         // TaskSubQueue* next_subq = (*m_subqu    
256         // if not threads bin and size differe    
257         // if(n != tbin && next_subq->size() <    
258         //    task_subq = next_subq;              
259         // try to acquire a claim for the bin     
260         // if acquired, no other threads will     
261         if(task_subq->AcquireClaim())             
262         {                                         
263             // push the task into the bin         
264             task_subq->PushTask(std::move(task    
265             // release the claim on the bin       
266             task_subq->ReleaseClaim();            
267             // return success                     
268             return true;                          
269         }                                         
270         return false;                             
271     };                                            
272     //----------------------------------------    
273                                                   
274     // if not in "hold/spin mode", where threa    
275     // specified bin, then move onto next bin     
276     //                                            
277     if(spin)                                      
278     {                                             
279         n = n % (m_workers + 1);                  
280         while(!insert_task(n))                    
281             ;                                     
282         return n;                                 
283     }                                             
284                                                   
285     // there are num_workers+1 bins so there i    
286     // execute num_workers+2 iterations so the    
287     while(true)                                   
288     {                                             
289         auto _n = (n++) % (m_workers + 1);        
290         if(insert_task(_n))                       
291             return _n;                            
292     }                                             
293 }                                                 
294                                                   
295 //============================================    
296                                                   
297 void                                              
298 UserTaskQueue::ExecuteOnAllThreads(ThreadPool*    
299 {                                                 
300     using task_group_type      = TaskGroup<int    
301     using thread_execute_map_t = std::map<int6    
302                                                   
303     if(!tp->is_alive())                           
304     {                                             
305         func();                                   
306         return;                                   
307     }                                             
308                                                   
309     task_group_type tg{ [](int& ref, int i) {     
310                                                   
311     // wait for all threads to finish any work    
312     // NOTE: will cause deadlock if called fro    
313     while(tp->get_active_threads_count() > 0)     
314         ThisThread::sleep_for(std::chrono::mil    
315                                                   
316     thread_execute_map_t                thread    
317     std::vector<std::shared_ptr<VTask>> _tasks    
318     _tasks.reserve(m_workers + 1);                
319                                                   
320     AcquireHold();                                
321     for(int i = 0; i < (m_workers + 1); ++i)      
322     {                                             
323         if(i == GetThreadBin())                   
324             continue;                             
325                                                   
326         //------------------------------------    
327         auto thread_specific_func = [&]() {       
328             ScopeDestructor _dtor = tg.get_sco    
329             static Mutex    _mtx;                 
330             _mtx.lock();                          
331             bool& _executed = thread_execute_m    
332             _mtx.unlock();                        
333             if(!_executed)                        
334             {                                     
335                 func();                           
336                 _executed = true;                 
337                 return 1;                         
338             }                                     
339             return 0;                             
340         };                                        
341         //------------------------------------    
342                                                   
343         InsertTask(tg.wrap(thread_specific_fun    
344     }                                             
345                                                   
346     tp->notify_all();                             
347     int nexecuted = tg.join();                    
348     if(nexecuted != m_workers)                    
349     {                                             
350         std::stringstream msg;                    
351         msg << "Failure executing routine on a    
352             << " threads executed function out    
353         std::cerr << msg.str() << std::endl;      
354     }                                             
355     ReleaseHold();                                
356 }                                                 
357                                                   
358 //============================================    
359                                                   
360 void                                              
361 UserTaskQueue::ExecuteOnSpecificThreads(Thread    
362                                         functi    
363 {                                                 
364     using task_group_type      = TaskGroup<int    
365     using thread_execute_map_t = std::map<int6    
366                                                   
367     task_group_type tg{ [](int& ref, int i) {     
368                                                   
369     // wait for all threads to finish any work    
370     // NOTE: will cause deadlock if called fro    
371     while(tp->get_active_threads_count() > 0)     
372         ThisThread::sleep_for(std::chrono::mil    
373                                                   
374     if(!tp->is_alive())                           
375     {                                             
376         func();                                   
377         return;                                   
378     }                                             
379                                                   
380     thread_execute_map_t thread_execute_map{};    
381                                                   
382     //========================================    
383     // wrap the function so that it will only     
384     // has an ID in the set                       
385     auto thread_specific_func = [&]() {           
386         ScopeDestructor _dtor = tg.get_scope_d    
387         static Mutex    _mtx;                     
388         _mtx.lock();                              
389         bool& _executed = thread_execute_map[G    
390         _mtx.unlock();                            
391         if(!_executed && tid_set.count(ThisThr    
392         {                                         
393             func();                               
394             _executed = true;                     
395             return 1;                             
396         }                                         
397         return 0;                                 
398     };                                            
399     //========================================    
400                                                   
401     if(tid_set.count(ThisThread::get_id()) > 0    
402         func();                                   
403                                                   
404     AcquireHold();                                
405     for(int i = 0; i < (m_workers + 1); ++i)      
406     {                                             
407         if(i == GetThreadBin())                   
408             continue;                             
409                                                   
410         InsertTask(tg.wrap(thread_specific_fun    
411     }                                             
412     tp->notify_all();                             
413     decltype(tid_set.size()) nexecuted = tg.jo    
414     if(nexecuted != tid_set.size())               
415     {                                             
416         std::stringstream msg;                    
417         msg << "Failure executing routine on s    
418             << " threads executed function out    
419         std::cerr << msg.str() << std::endl;      
420     }                                             
421     ReleaseHold();                                
422 }                                                 
423                                                   
424 //============================================    
425                                                   
426 void                                              
427 UserTaskQueue::AcquireHold()                      
428 {                                                 
429     bool _hold;                                   
430     while(!(_hold = m_hold->load(std::memory_o    
431     {                                             
432         m_hold->compare_exchange_strong(_hold,    
433                                         std::m    
434     }                                             
435 }                                                 
436                                                   
437 //============================================    
438                                                   
439 void                                              
440 UserTaskQueue::ReleaseHold()                      
441 {                                                 
442     bool _hold;                                   
443     while((_hold = m_hold->load(std::memory_or    
444     {                                             
445         m_hold->compare_exchange_strong(_hold,    
446                                         std::m    
447     }                                             
448 }                                                 
449                                                   
450 //============================================    
451                                                   
452 }  // namespace PTL                               
453