Geant4 Cross Reference |
1 // 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 19 // --------------------------------------------------------------- 20 // Tasking class implementation 21 // Class Description: 22 // --------------------------------------------------------------- 23 // Author: Jonathan Madsen 24 // --------------------------------------------------------------- 25 26 #include "PTL/UserTaskQueue.hh" 27 28 #include "PTL/AutoLock.hh" 29 #include "PTL/ScopeDestructor.hh" 30 #include "PTL/TaskGroup.hh" 31 #include "PTL/ThreadData.hh" 32 #include "PTL/ThreadPool.hh" 33 34 #include <cassert> 35 #include <chrono> 36 #include <functional> 37 #include <iostream> 38 #include <map> 39 #include <stdexcept> 40 #include <thread> 41 #include <utility> 42 43 namespace PTL 44 { 45 //======================================================================================// 46 47 UserTaskQueue::UserTaskQueue(intmax_t nworkers, UserTaskQueue* parent) 48 : VUserTaskQueue(nworkers) 49 , m_is_clone((parent) != nullptr) 50 , m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0) 51 , m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0) 52 , m_hold((parent) ? parent->m_hold : new std::atomic_bool(false)) 53 , m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0)) 54 , m_mutex((parent) ? parent->m_mutex : new Mutex{}) 55 , m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer()) 56 { 57 // create nthreads + 1 subqueues so there is always a subqueue available 58 if(!parent) 59 { 60 for(intmax_t i = 0; i < nworkers + 1; ++i) 61 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks)); 62 } 63 } 64 65 //======================================================================================// 66 67 UserTaskQueue::~UserTaskQueue() 68 { 69 if(!m_is_clone) 70 { 71 for(auto& itr : *m_subqueues) 72 { 73 assert(itr->empty()); 74 delete itr; 75 } 76 m_subqueues->clear(); 77 delete m_hold; 78 delete m_ntasks; 79 delete m_mutex; 80 delete m_subqueues; 81 } 82 } 83 84 //======================================================================================// 85 86 void 87 UserTaskQueue::resize(intmax_t n) 88 { 89 if(!m_mutex) 90 throw std::runtime_error("nullptr to mutex"); 91 AutoLock lk(m_mutex); 92 if(m_workers < n) 93 { 94 while(m_workers < n) 95 { 96 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks)); 97 ++m_workers; 98 } 99 } 100 else if(m_workers > n) 101 { 102 while(m_workers > n) 103 { 104 delete m_subqueues->back(); 105 m_subqueues->pop_back(); 106 --m_workers; 107 } 108 } 109 } 110 111 //======================================================================================// 112 113 VUserTaskQueue* 114 UserTaskQueue::clone() 115 { 116 return new UserTaskQueue(workers(), this); 117 } 118 //======================================================================================// 119 120 intmax_t 121 UserTaskQueue::GetThreadBin() const 122 { 123 // get a thread id number 124 static thread_local intmax_t tl_bin = 125 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1); 126 return tl_bin; 127 } 128 129 //======================================================================================// 130 131 intmax_t 132 UserTaskQueue::GetInsertBin() const 133 { 134 return (++m_insert_bin % (m_workers + 1)); 135 } 136 137 //======================================================================================// 138 139 UserTaskQueue::task_pointer 140 UserTaskQueue::GetThreadBinTask() 141 { 142 intmax_t tbin = GetThreadBin(); 143 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)]; 144 task_pointer _task = nullptr; 145 146 //------------------------------------------------------------------------// 147 auto get_task = [&]() { 148 if(task_subq->AcquireClaim()) 149 { 150 // run task 151 _task = task_subq->PopTask(true); 152 // release the claim on the bin 153 task_subq->ReleaseClaim(); 154 } 155 if(_task) 156 --(*m_ntasks); 157 // return success if valid pointer 158 return (_task != nullptr); 159 }; 160 //------------------------------------------------------------------------// 161 162 // while not empty 163 while(!task_subq->empty()) 164 { 165 if(get_task()) 166 break; 167 } 168 return _task; 169 } 170 171 //======================================================================================// 172 173 UserTaskQueue::task_pointer 174 UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr) 175 { 176 // exit if empty 177 if(this->true_empty()) 178 return nullptr; 179 180 // ensure the thread has a bin assignment 181 intmax_t tbin = GetThreadBin(); 182 intmax_t n = (subq < 0) ? tbin : subq; 183 if(nitr < 1) 184 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed); 185 186 if(m_hold->load(std::memory_order_relaxed)) 187 { 188 return GetThreadBinTask(); 189 } 190 191 task_pointer _task = nullptr; 192 //------------------------------------------------------------------------// 193 auto get_task = [&](intmax_t _n) { 194 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)]; 195 // try to acquire a claim for the bin 196 // if acquired, no other threads will access bin until claim is released 197 if(!task_subq->empty() && task_subq->AcquireClaim()) 198 { 199 // pop task out of bin 200 _task = task_subq->PopTask(n == tbin); 201 // release the claim on the bin 202 task_subq->ReleaseClaim(); 203 } 204 if(_task) 205 --(*m_ntasks); 206 // return success if valid pointer 207 return (_task != nullptr); 208 }; 209 //------------------------------------------------------------------------// 210 211 // there are num_workers+1 bins so there is always a bin that is open 212 // execute num_workers+2 iterations so the thread checks its bin twice 213 // while(!empty()) 214 { 215 for(intmax_t i = 0; i < nitr; ++i, ++n) 216 { 217 if(get_task(n % (m_workers + 1))) 218 return _task; 219 } 220 } 221 222 // only reached if looped over all bins (and looked in own bin twice) 223 // and found no work so return an empty task and the thread will be put to 224 // sleep if there is still no work by the time it reaches its 225 // condition variable 226 return _task; 227 } 228 229 //======================================================================================// 230 231 intmax_t 232 UserTaskQueue::InsertTask(task_pointer&& task, ThreadData* data, intmax_t subq) 233 { 234 // increment number of tasks 235 ++(*m_ntasks); 236 237 bool spin = m_hold->load(std::memory_order_relaxed); 238 intmax_t tbin = GetThreadBin(); 239 240 if(data && data->within_task) 241 { 242 subq = tbin; 243 // spin = true; 244 } 245 246 // subq is -1 unless specified so unless specified 247 // GetInsertBin() call increments a counter and returns 248 // counter % (num_workers + 1) so that tasks are distributed evenly 249 // among the bins 250 intmax_t n = (subq < 0) ? GetInsertBin() : subq; 251 252 //------------------------------------------------------------------------// 253 auto insert_task = [&](intmax_t _n) { 254 TaskSubQueue* task_subq = (*m_subqueues)[_n]; 255 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)]; 256 // if not threads bin and size difference, insert into smaller 257 // if(n != tbin && next_subq->size() < task_subq->size()) 258 // task_subq = next_subq; 259 // try to acquire a claim for the bin 260 // if acquired, no other threads will access bin until claim is released 261 if(task_subq->AcquireClaim()) 262 { 263 // push the task into the bin 264 task_subq->PushTask(std::move(task)); 265 // release the claim on the bin 266 task_subq->ReleaseClaim(); 267 // return success 268 return true; 269 } 270 return false; 271 }; 272 //------------------------------------------------------------------------// 273 274 // if not in "hold/spin mode", where thread only inserts tasks into 275 // specified bin, then move onto next bin 276 // 277 if(spin) 278 { 279 n = n % (m_workers + 1); 280 while(!insert_task(n)) 281 ; 282 return n; 283 } 284 285 // there are num_workers+1 bins so there is always a bin that is open 286 // execute num_workers+2 iterations so the thread checks its bin twice 287 while(true) 288 { 289 auto _n = (n++) % (m_workers + 1); 290 if(insert_task(_n)) 291 return _n; 292 } 293 } 294 295 //======================================================================================// 296 297 void 298 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* tp, function_type func) 299 { 300 using task_group_type = TaskGroup<int, int>; 301 using thread_execute_map_t = std::map<int64_t, bool>; 302 303 if(!tp->is_alive()) 304 { 305 func(); 306 return; 307 } 308 309 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp }; 310 311 // wait for all threads to finish any work 312 // NOTE: will cause deadlock if called from a task 313 while(tp->get_active_threads_count() > 0) 314 ThisThread::sleep_for(std::chrono::milliseconds(10)); 315 316 thread_execute_map_t thread_execute_map{}; 317 std::vector<std::shared_ptr<VTask>> _tasks{}; 318 _tasks.reserve(m_workers + 1); 319 320 AcquireHold(); 321 for(int i = 0; i < (m_workers + 1); ++i) 322 { 323 if(i == GetThreadBin()) 324 continue; 325 326 //--------------------------------------------------------------------// 327 auto thread_specific_func = [&]() { 328 ScopeDestructor _dtor = tg.get_scope_destructor(); 329 static Mutex _mtx; 330 _mtx.lock(); 331 bool& _executed = thread_execute_map[GetThreadBin()]; 332 _mtx.unlock(); 333 if(!_executed) 334 { 335 func(); 336 _executed = true; 337 return 1; 338 } 339 return 0; 340 }; 341 //--------------------------------------------------------------------// 342 343 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i); 344 } 345 346 tp->notify_all(); 347 int nexecuted = tg.join(); 348 if(nexecuted != m_workers) 349 { 350 std::stringstream msg; 351 msg << "Failure executing routine on all threads! Only " << nexecuted 352 << " threads executed function out of " << m_workers << " workers"; 353 std::cerr << msg.str() << std::endl; 354 } 355 ReleaseHold(); 356 } 357 358 //======================================================================================// 359 360 void 361 UserTaskQueue::ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool* tp, 362 function_type func) 363 { 364 using task_group_type = TaskGroup<int, int>; 365 using thread_execute_map_t = std::map<int64_t, bool>; 366 367 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp }; 368 369 // wait for all threads to finish any work 370 // NOTE: will cause deadlock if called from a task 371 while(tp->get_active_threads_count() > 0) 372 ThisThread::sleep_for(std::chrono::milliseconds(10)); 373 374 if(!tp->is_alive()) 375 { 376 func(); 377 return; 378 } 379 380 thread_execute_map_t thread_execute_map{}; 381 382 //========================================================================// 383 // wrap the function so that it will only be executed if the thread 384 // has an ID in the set 385 auto thread_specific_func = [&]() { 386 ScopeDestructor _dtor = tg.get_scope_destructor(); 387 static Mutex _mtx; 388 _mtx.lock(); 389 bool& _executed = thread_execute_map[GetThreadBin()]; 390 _mtx.unlock(); 391 if(!_executed && tid_set.count(ThisThread::get_id()) > 0) 392 { 393 func(); 394 _executed = true; 395 return 1; 396 } 397 return 0; 398 }; 399 //========================================================================// 400 401 if(tid_set.count(ThisThread::get_id()) > 0) 402 func(); 403 404 AcquireHold(); 405 for(int i = 0; i < (m_workers + 1); ++i) 406 { 407 if(i == GetThreadBin()) 408 continue; 409 410 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i); 411 } 412 tp->notify_all(); 413 decltype(tid_set.size()) nexecuted = tg.join(); 414 if(nexecuted != tid_set.size()) 415 { 416 std::stringstream msg; 417 msg << "Failure executing routine on specific threads! Only " << nexecuted 418 << " threads executed function out of " << tid_set.size() << " workers"; 419 std::cerr << msg.str() << std::endl; 420 } 421 ReleaseHold(); 422 } 423 424 //======================================================================================// 425 426 void 427 UserTaskQueue::AcquireHold() 428 { 429 bool _hold; 430 while(!(_hold = m_hold->load(std::memory_order_relaxed))) 431 { 432 m_hold->compare_exchange_strong(_hold, true, std::memory_order_release, 433 std::memory_order_relaxed); 434 } 435 } 436 437 //======================================================================================// 438 439 void 440 UserTaskQueue::ReleaseHold() 441 { 442 bool _hold; 443 while((_hold = m_hold->load(std::memory_order_relaxed))) 444 { 445 m_hold->compare_exchange_strong(_hold, false, std::memory_order_release, 446 std::memory_order_relaxed); 447 } 448 } 449 450 //======================================================================================// 451 452 } // namespace PTL 453