Geant4 Cross Reference |
1 // 1 // 2 // MIT License 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentati 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, includ 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distr 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit perso 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the followin 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permiss 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the S 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 18 // 19 // ------------------------------------------- 19 // --------------------------------------------------------------- 20 // Tasking class implementation 20 // Tasking class implementation 21 // Class Description: 21 // Class Description: 22 // ------------------------------------------ 22 // --------------------------------------------------------------- 23 // Author: Jonathan Madsen 23 // Author: Jonathan Madsen 24 // ------------------------------------------ 24 // --------------------------------------------------------------- 25 25 26 #include "PTL/UserTaskQueue.hh" 26 #include "PTL/UserTaskQueue.hh" 27 27 28 #include "PTL/AutoLock.hh" 28 #include "PTL/AutoLock.hh" 29 #include "PTL/ScopeDestructor.hh" << 30 #include "PTL/TaskGroup.hh" 29 #include "PTL/TaskGroup.hh" 31 #include "PTL/ThreadData.hh" 30 #include "PTL/ThreadData.hh" 32 #include "PTL/ThreadPool.hh" 31 #include "PTL/ThreadPool.hh" >> 32 #include "PTL/Utility.hh" 33 33 34 #include <cassert> 34 #include <cassert> 35 #include <chrono> 35 #include <chrono> 36 #include <functional> 36 #include <functional> 37 #include <iostream> 37 #include <iostream> 38 #include <map> 38 #include <map> 39 #include <stdexcept> 39 #include <stdexcept> >> 40 #include <system_error> 40 #include <thread> 41 #include <thread> 41 #include <utility> 42 #include <utility> 42 43 43 namespace PTL << 44 using namespace PTL; 44 { << 45 45 //============================================ 46 //======================================================================================// 46 47 47 UserTaskQueue::UserTaskQueue(intmax_t nworkers 48 UserTaskQueue::UserTaskQueue(intmax_t nworkers, UserTaskQueue* parent) 48 : VUserTaskQueue(nworkers) 49 : VUserTaskQueue(nworkers) 49 , m_is_clone((parent) != nullptr) 50 , m_is_clone((parent) != nullptr) 50 , m_thread_bin((parent) ? (ThreadPool::get_thi 51 , m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0) 51 , m_insert_bin((parent) ? (ThreadPool::get_thi 52 , m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0) 52 , m_hold((parent) ? parent->m_hold : new std:: 53 , m_hold((parent) ? parent->m_hold : new std::atomic_bool(false)) 53 , m_ntasks((parent) ? parent->m_ntasks : new s 54 , m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0)) 54 , m_mutex((parent) ? parent->m_mutex : new Mut 55 , m_mutex((parent) ? parent->m_mutex : new Mutex{}) 55 , m_subqueues((parent) ? parent->m_subqueues : 56 , m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer()) 56 { 57 { 57 // create nthreads + 1 subqueues so there 58 // create nthreads + 1 subqueues so there is always a subqueue available 58 if(!parent) 59 if(!parent) 59 { 60 { 60 for(intmax_t i = 0; i < nworkers + 1; 61 for(intmax_t i = 0; i < nworkers + 1; ++i) 61 m_subqueues->emplace_back(new Task 62 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks)); 62 } 63 } >> 64 >> 65 #if defined(DEBUG) >> 66 if(GetEnv<int>("PTL_VERBOSE", 0) > 3) >> 67 { >> 68 RecursiveAutoLock l(TypeMutex<decltype(std::cout), RecursiveMutex>()); >> 69 std::stringstream ss; >> 70 ss << ThreadPool::get_this_thread_id() << "> " << ThisThread::get_id() << " [" >> 71 << __FUNCTION__ << ":" << __LINE__ << "] " >> 72 << "this = " << this << ", " >> 73 << "clone = " << std::boolalpha << m_is_clone << ", " >> 74 << "thread = " << m_thread_bin << ", " >> 75 << "insert = " << m_insert_bin << ", " >> 76 << "hold = " << m_hold->load() << " @ " << m_hold << ", " >> 77 << "tasks = " << m_ntasks->load() << " @ " << m_ntasks << ", " >> 78 << "subqueue = " << m_subqueues << ", " >> 79 << "size = " << true_size() << ", " >> 80 << "empty = " << true_empty(); >> 81 std::cout << ss.str() << std::endl; >> 82 } >> 83 #endif 63 } 84 } 64 85 65 //============================================ 86 //======================================================================================// 66 87 67 UserTaskQueue::~UserTaskQueue() 88 UserTaskQueue::~UserTaskQueue() 68 { 89 { 69 if(!m_is_clone) 90 if(!m_is_clone) 70 { 91 { 71 for(auto& itr : *m_subqueues) 92 for(auto& itr : *m_subqueues) 72 { 93 { 73 assert(itr->empty()); 94 assert(itr->empty()); 74 delete itr; 95 delete itr; 75 } 96 } 76 m_subqueues->clear(); 97 m_subqueues->clear(); 77 delete m_hold; 98 delete m_hold; 78 delete m_ntasks; 99 delete m_ntasks; 79 delete m_mutex; 100 delete m_mutex; 80 delete m_subqueues; 101 delete m_subqueues; 81 } 102 } 82 } 103 } 83 104 84 //============================================ 105 //======================================================================================// 85 106 86 void 107 void 87 UserTaskQueue::resize(intmax_t n) 108 UserTaskQueue::resize(intmax_t n) 88 { 109 { 89 if(!m_mutex) 110 if(!m_mutex) 90 throw std::runtime_error("nullptr to m 111 throw std::runtime_error("nullptr to mutex"); 91 AutoLock lk(m_mutex); 112 AutoLock lk(m_mutex); 92 if(m_workers < n) 113 if(m_workers < n) 93 { 114 { 94 while(m_workers < n) 115 while(m_workers < n) 95 { 116 { 96 m_subqueues->emplace_back(new Task 117 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks)); 97 ++m_workers; 118 ++m_workers; 98 } 119 } 99 } 120 } 100 else if(m_workers > n) 121 else if(m_workers > n) 101 { 122 { 102 while(m_workers > n) 123 while(m_workers > n) 103 { 124 { 104 delete m_subqueues->back(); 125 delete m_subqueues->back(); 105 m_subqueues->pop_back(); 126 m_subqueues->pop_back(); 106 --m_workers; 127 --m_workers; 107 } 128 } 108 } 129 } 109 } 130 } 110 131 111 //============================================ 132 //======================================================================================// 112 133 113 VUserTaskQueue* 134 VUserTaskQueue* 114 UserTaskQueue::clone() 135 UserTaskQueue::clone() 115 { 136 { 116 return new UserTaskQueue(workers(), this); 137 return new UserTaskQueue(workers(), this); 117 } 138 } 118 //============================================ 139 //======================================================================================// 119 140 120 intmax_t 141 intmax_t 121 UserTaskQueue::GetThreadBin() const 142 UserTaskQueue::GetThreadBin() const 122 { 143 { 123 // get a thread id number 144 // get a thread id number 124 static thread_local intmax_t tl_bin = 145 static thread_local intmax_t tl_bin = 125 (m_thread_bin + ThreadPool::get_this_t 146 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1); 126 return tl_bin; 147 return tl_bin; 127 } 148 } 128 149 129 //============================================ 150 //======================================================================================// 130 151 131 intmax_t 152 intmax_t 132 UserTaskQueue::GetInsertBin() const 153 UserTaskQueue::GetInsertBin() const 133 { 154 { 134 return (++m_insert_bin % (m_workers + 1)); 155 return (++m_insert_bin % (m_workers + 1)); 135 } 156 } 136 157 137 //============================================ 158 //======================================================================================// 138 159 139 UserTaskQueue::task_pointer 160 UserTaskQueue::task_pointer 140 UserTaskQueue::GetThreadBinTask() 161 UserTaskQueue::GetThreadBinTask() 141 { 162 { 142 intmax_t tbin = GetThreadBin(); 163 intmax_t tbin = GetThreadBin(); 143 TaskSubQueue* task_subq = (*m_subqueues)[t 164 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)]; 144 task_pointer _task = nullptr; 165 task_pointer _task = nullptr; 145 166 146 //---------------------------------------- 167 //------------------------------------------------------------------------// 147 auto get_task = [&]() { 168 auto get_task = [&]() { 148 if(task_subq->AcquireClaim()) 169 if(task_subq->AcquireClaim()) 149 { 170 { 150 // run task 171 // run task 151 _task = task_subq->PopTask(true); 172 _task = task_subq->PopTask(true); 152 // release the claim on the bin 173 // release the claim on the bin 153 task_subq->ReleaseClaim(); 174 task_subq->ReleaseClaim(); 154 } 175 } 155 if(_task) 176 if(_task) 156 --(*m_ntasks); 177 --(*m_ntasks); 157 // return success if valid pointer 178 // return success if valid pointer 158 return (_task != nullptr); 179 return (_task != nullptr); 159 }; 180 }; 160 //---------------------------------------- 181 //------------------------------------------------------------------------// 161 182 162 // while not empty 183 // while not empty 163 while(!task_subq->empty()) 184 while(!task_subq->empty()) 164 { 185 { 165 if(get_task()) 186 if(get_task()) 166 break; 187 break; 167 } 188 } 168 return _task; 189 return _task; 169 } 190 } 170 191 171 //============================================ 192 //======================================================================================// 172 193 173 UserTaskQueue::task_pointer 194 UserTaskQueue::task_pointer 174 UserTaskQueue::GetTask(intmax_t subq, intmax_t 195 UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr) 175 { 196 { 176 // exit if empty 197 // exit if empty 177 if(this->true_empty()) 198 if(this->true_empty()) 178 return nullptr; 199 return nullptr; 179 200 180 // ensure the thread has a bin assignment 201 // ensure the thread has a bin assignment 181 intmax_t tbin = GetThreadBin(); 202 intmax_t tbin = GetThreadBin(); 182 intmax_t n = (subq < 0) ? tbin : subq; 203 intmax_t n = (subq < 0) ? tbin : subq; 183 if(nitr < 1) 204 if(nitr < 1) 184 nitr = (m_workers + 1); // * m_ntasks 205 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed); 185 206 186 if(m_hold->load(std::memory_order_relaxed) 207 if(m_hold->load(std::memory_order_relaxed)) 187 { 208 { 188 return GetThreadBinTask(); 209 return GetThreadBinTask(); 189 } 210 } 190 211 191 task_pointer _task = nullptr; 212 task_pointer _task = nullptr; 192 //---------------------------------------- 213 //------------------------------------------------------------------------// 193 auto get_task = [&](intmax_t _n) { 214 auto get_task = [&](intmax_t _n) { 194 TaskSubQueue* task_subq = (*m_subqueue 215 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)]; 195 // try to acquire a claim for the bin 216 // try to acquire a claim for the bin 196 // if acquired, no other threads will 217 // if acquired, no other threads will access bin until claim is released 197 if(!task_subq->empty() && task_subq->A 218 if(!task_subq->empty() && task_subq->AcquireClaim()) 198 { 219 { 199 // pop task out of bin 220 // pop task out of bin 200 _task = task_subq->PopTask(n == tb 221 _task = task_subq->PopTask(n == tbin); 201 // release the claim on the bin 222 // release the claim on the bin 202 task_subq->ReleaseClaim(); 223 task_subq->ReleaseClaim(); 203 } 224 } 204 if(_task) 225 if(_task) 205 --(*m_ntasks); 226 --(*m_ntasks); 206 // return success if valid pointer 227 // return success if valid pointer 207 return (_task != nullptr); 228 return (_task != nullptr); 208 }; 229 }; 209 //---------------------------------------- 230 //------------------------------------------------------------------------// 210 231 211 // there are num_workers+1 bins so there i 232 // there are num_workers+1 bins so there is always a bin that is open 212 // execute num_workers+2 iterations so the 233 // execute num_workers+2 iterations so the thread checks its bin twice 213 // while(!empty()) 234 // while(!empty()) 214 { 235 { 215 for(intmax_t i = 0; i < nitr; ++i, ++n 236 for(intmax_t i = 0; i < nitr; ++i, ++n) 216 { 237 { 217 if(get_task(n % (m_workers + 1))) 238 if(get_task(n % (m_workers + 1))) 218 return _task; 239 return _task; 219 } 240 } 220 } 241 } 221 242 222 // only reached if looped over all bins (a 243 // only reached if looped over all bins (and looked in own bin twice) 223 // and found no work so return an empty ta 244 // and found no work so return an empty task and the thread will be put to 224 // sleep if there is still no work by the 245 // sleep if there is still no work by the time it reaches its 225 // condition variable 246 // condition variable 226 return _task; 247 return _task; 227 } 248 } 228 249 229 //============================================ 250 //======================================================================================// 230 251 231 intmax_t 252 intmax_t 232 UserTaskQueue::InsertTask(task_pointer&& task, 253 UserTaskQueue::InsertTask(task_pointer&& task, ThreadData* data, intmax_t subq) 233 { 254 { 234 // increment number of tasks 255 // increment number of tasks 235 ++(*m_ntasks); 256 ++(*m_ntasks); 236 257 237 bool spin = m_hold->load(std::memory_o 258 bool spin = m_hold->load(std::memory_order_relaxed); 238 intmax_t tbin = GetThreadBin(); 259 intmax_t tbin = GetThreadBin(); 239 260 240 if(data && data->within_task) 261 if(data && data->within_task) 241 { 262 { 242 subq = tbin; 263 subq = tbin; 243 // spin = true; 264 // spin = true; 244 } 265 } 245 266 246 // subq is -1 unless specified so unless s 267 // subq is -1 unless specified so unless specified 247 // GetInsertBin() call increments a counte 268 // GetInsertBin() call increments a counter and returns 248 // counter % (num_workers + 1) so that tas 269 // counter % (num_workers + 1) so that tasks are distributed evenly 249 // among the bins 270 // among the bins 250 intmax_t n = (subq < 0) ? GetInsertBin() : 271 intmax_t n = (subq < 0) ? GetInsertBin() : subq; 251 272 252 //---------------------------------------- 273 //------------------------------------------------------------------------// 253 auto insert_task = [&](intmax_t _n) { 274 auto insert_task = [&](intmax_t _n) { 254 TaskSubQueue* task_subq = (*m_subqueue 275 TaskSubQueue* task_subq = (*m_subqueues)[_n]; 255 // TaskSubQueue* next_subq = (*m_subqu 276 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)]; 256 // if not threads bin and size differe 277 // if not threads bin and size difference, insert into smaller 257 // if(n != tbin && next_subq->size() < 278 // if(n != tbin && next_subq->size() < task_subq->size()) 258 // task_subq = next_subq; 279 // task_subq = next_subq; 259 // try to acquire a claim for the bin 280 // try to acquire a claim for the bin 260 // if acquired, no other threads will 281 // if acquired, no other threads will access bin until claim is released 261 if(task_subq->AcquireClaim()) 282 if(task_subq->AcquireClaim()) 262 { 283 { 263 // push the task into the bin 284 // push the task into the bin 264 task_subq->PushTask(std::move(task 285 task_subq->PushTask(std::move(task)); 265 // release the claim on the bin 286 // release the claim on the bin 266 task_subq->ReleaseClaim(); 287 task_subq->ReleaseClaim(); 267 // return success 288 // return success 268 return true; 289 return true; 269 } 290 } 270 return false; 291 return false; 271 }; 292 }; 272 //---------------------------------------- 293 //------------------------------------------------------------------------// 273 294 274 // if not in "hold/spin mode", where threa 295 // if not in "hold/spin mode", where thread only inserts tasks into 275 // specified bin, then move onto next bin 296 // specified bin, then move onto next bin 276 // 297 // 277 if(spin) 298 if(spin) 278 { 299 { 279 n = n % (m_workers + 1); 300 n = n % (m_workers + 1); 280 while(!insert_task(n)) 301 while(!insert_task(n)) 281 ; 302 ; 282 return n; 303 return n; 283 } 304 } 284 305 285 // there are num_workers+1 bins so there i 306 // there are num_workers+1 bins so there is always a bin that is open 286 // execute num_workers+2 iterations so the 307 // execute num_workers+2 iterations so the thread checks its bin twice 287 while(true) 308 while(true) 288 { 309 { 289 auto _n = (n++) % (m_workers + 1); 310 auto _n = (n++) % (m_workers + 1); 290 if(insert_task(_n)) 311 if(insert_task(_n)) 291 return _n; 312 return _n; 292 } 313 } >> 314 return GetThreadBin(); 293 } 315 } 294 316 295 //============================================ 317 //======================================================================================// 296 318 297 void 319 void 298 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* 320 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* tp, function_type func) 299 { 321 { 300 using task_group_type = TaskGroup<int 322 using task_group_type = TaskGroup<int, int>; 301 using thread_execute_map_t = std::map<int6 323 using thread_execute_map_t = std::map<int64_t, bool>; 302 324 303 if(!tp->is_alive()) 325 if(!tp->is_alive()) 304 { 326 { 305 func(); 327 func(); 306 return; 328 return; 307 } 329 } 308 330 309 task_group_type tg{ [](int& ref, int i) { 331 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp }; 310 332 311 // wait for all threads to finish any work 333 // wait for all threads to finish any work 312 // NOTE: will cause deadlock if called fro 334 // NOTE: will cause deadlock if called from a task 313 while(tp->get_active_threads_count() > 0) 335 while(tp->get_active_threads_count() > 0) 314 ThisThread::sleep_for(std::chrono::mil 336 ThisThread::sleep_for(std::chrono::milliseconds(10)); 315 337 316 thread_execute_map_t thread 338 thread_execute_map_t thread_execute_map{}; 317 std::vector<std::shared_ptr<VTask>> _tasks 339 std::vector<std::shared_ptr<VTask>> _tasks{}; 318 _tasks.reserve(m_workers + 1); 340 _tasks.reserve(m_workers + 1); 319 341 320 AcquireHold(); 342 AcquireHold(); 321 for(int i = 0; i < (m_workers + 1); ++i) 343 for(int i = 0; i < (m_workers + 1); ++i) 322 { 344 { 323 if(i == GetThreadBin()) 345 if(i == GetThreadBin()) 324 continue; 346 continue; 325 347 326 //------------------------------------ 348 //--------------------------------------------------------------------// 327 auto thread_specific_func = [&]() { 349 auto thread_specific_func = [&]() { 328 ScopeDestructor _dtor = tg.get_sco 350 ScopeDestructor _dtor = tg.get_scope_destructor(); 329 static Mutex _mtx; 351 static Mutex _mtx; 330 _mtx.lock(); 352 _mtx.lock(); 331 bool& _executed = thread_execute_m 353 bool& _executed = thread_execute_map[GetThreadBin()]; 332 _mtx.unlock(); 354 _mtx.unlock(); 333 if(!_executed) 355 if(!_executed) 334 { 356 { 335 func(); 357 func(); 336 _executed = true; 358 _executed = true; 337 return 1; 359 return 1; 338 } 360 } 339 return 0; 361 return 0; 340 }; 362 }; 341 //------------------------------------ 363 //--------------------------------------------------------------------// 342 364 343 InsertTask(tg.wrap(thread_specific_fun 365 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i); 344 } 366 } 345 367 346 tp->notify_all(); 368 tp->notify_all(); 347 int nexecuted = tg.join(); 369 int nexecuted = tg.join(); 348 if(nexecuted != m_workers) 370 if(nexecuted != m_workers) 349 { 371 { 350 std::stringstream msg; 372 std::stringstream msg; 351 msg << "Failure executing routine on a 373 msg << "Failure executing routine on all threads! Only " << nexecuted 352 << " threads executed function out 374 << " threads executed function out of " << m_workers << " workers"; 353 std::cerr << msg.str() << std::endl; 375 std::cerr << msg.str() << std::endl; 354 } 376 } 355 ReleaseHold(); 377 ReleaseHold(); 356 } 378 } 357 379 358 //============================================ 380 //======================================================================================// 359 381 360 void 382 void 361 UserTaskQueue::ExecuteOnSpecificThreads(Thread 383 UserTaskQueue::ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool* tp, 362 functi 384 function_type func) 363 { 385 { 364 using task_group_type = TaskGroup<int 386 using task_group_type = TaskGroup<int, int>; 365 using thread_execute_map_t = std::map<int6 387 using thread_execute_map_t = std::map<int64_t, bool>; 366 388 367 task_group_type tg{ [](int& ref, int i) { 389 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp }; 368 390 369 // wait for all threads to finish any work 391 // wait for all threads to finish any work 370 // NOTE: will cause deadlock if called fro 392 // NOTE: will cause deadlock if called from a task 371 while(tp->get_active_threads_count() > 0) 393 while(tp->get_active_threads_count() > 0) 372 ThisThread::sleep_for(std::chrono::mil 394 ThisThread::sleep_for(std::chrono::milliseconds(10)); 373 395 374 if(!tp->is_alive()) 396 if(!tp->is_alive()) 375 { 397 { 376 func(); 398 func(); 377 return; 399 return; 378 } 400 } 379 401 380 thread_execute_map_t thread_execute_map{}; 402 thread_execute_map_t thread_execute_map{}; 381 403 382 //======================================== 404 //========================================================================// 383 // wrap the function so that it will only 405 // wrap the function so that it will only be executed if the thread 384 // has an ID in the set 406 // has an ID in the set 385 auto thread_specific_func = [&]() { 407 auto thread_specific_func = [&]() { 386 ScopeDestructor _dtor = tg.get_scope_d 408 ScopeDestructor _dtor = tg.get_scope_destructor(); 387 static Mutex _mtx; 409 static Mutex _mtx; 388 _mtx.lock(); 410 _mtx.lock(); 389 bool& _executed = thread_execute_map[G 411 bool& _executed = thread_execute_map[GetThreadBin()]; 390 _mtx.unlock(); 412 _mtx.unlock(); 391 if(!_executed && tid_set.count(ThisThr 413 if(!_executed && tid_set.count(ThisThread::get_id()) > 0) 392 { 414 { 393 func(); 415 func(); 394 _executed = true; 416 _executed = true; 395 return 1; 417 return 1; 396 } 418 } 397 return 0; 419 return 0; 398 }; 420 }; 399 //======================================== 421 //========================================================================// 400 422 401 if(tid_set.count(ThisThread::get_id()) > 0 423 if(tid_set.count(ThisThread::get_id()) > 0) 402 func(); 424 func(); 403 425 404 AcquireHold(); 426 AcquireHold(); 405 for(int i = 0; i < (m_workers + 1); ++i) 427 for(int i = 0; i < (m_workers + 1); ++i) 406 { 428 { 407 if(i == GetThreadBin()) 429 if(i == GetThreadBin()) 408 continue; 430 continue; 409 431 410 InsertTask(tg.wrap(thread_specific_fun 432 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i); 411 } 433 } 412 tp->notify_all(); 434 tp->notify_all(); 413 decltype(tid_set.size()) nexecuted = tg.jo 435 decltype(tid_set.size()) nexecuted = tg.join(); 414 if(nexecuted != tid_set.size()) 436 if(nexecuted != tid_set.size()) 415 { 437 { 416 std::stringstream msg; 438 std::stringstream msg; 417 msg << "Failure executing routine on s 439 msg << "Failure executing routine on specific threads! Only " << nexecuted 418 << " threads executed function out 440 << " threads executed function out of " << tid_set.size() << " workers"; 419 std::cerr << msg.str() << std::endl; 441 std::cerr << msg.str() << std::endl; 420 } 442 } 421 ReleaseHold(); 443 ReleaseHold(); 422 } 444 } 423 445 424 //============================================ 446 //======================================================================================// 425 447 426 void 448 void 427 UserTaskQueue::AcquireHold() 449 UserTaskQueue::AcquireHold() 428 { 450 { 429 bool _hold; 451 bool _hold; 430 while(!(_hold = m_hold->load(std::memory_o 452 while(!(_hold = m_hold->load(std::memory_order_relaxed))) 431 { 453 { 432 m_hold->compare_exchange_strong(_hold, 454 m_hold->compare_exchange_strong(_hold, true, std::memory_order_release, 433 std::m 455 std::memory_order_relaxed); 434 } 456 } 435 } 457 } 436 458 437 //============================================ 459 //======================================================================================// 438 460 439 void 461 void 440 UserTaskQueue::ReleaseHold() 462 UserTaskQueue::ReleaseHold() 441 { 463 { 442 bool _hold; 464 bool _hold; 443 while((_hold = m_hold->load(std::memory_or 465 while((_hold = m_hold->load(std::memory_order_relaxed))) 444 { 466 { 445 m_hold->compare_exchange_strong(_hold, 467 m_hold->compare_exchange_strong(_hold, false, std::memory_order_release, 446 std::m 468 std::memory_order_relaxed); 447 } 469 } 448 } 470 } 449 471 450 //============================================ 472 //======================================================================================// 451 << 452 } // namespace PTL << 453 473