Geant4 Cross Reference |
1 // 1 // 2 // MIT License 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentati 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, includ 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distr 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit perso 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the followin 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permiss 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the S 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 18 // 19 // ------------------------------------------- 19 // --------------------------------------------------------------- 20 // Tasking class implementation 20 // Tasking class implementation 21 // Class Description: 21 // Class Description: 22 // ------------------------------------------ 22 // --------------------------------------------------------------- 23 // Author: Jonathan Madsen 23 // Author: Jonathan Madsen 24 // ------------------------------------------ 24 // --------------------------------------------------------------- 25 25 26 #include "PTL/UserTaskQueue.hh" 26 #include "PTL/UserTaskQueue.hh" 27 << 27 #include "PTL/Task.hh" 28 #include "PTL/AutoLock.hh" << 29 #include "PTL/ScopeDestructor.hh" << 30 #include "PTL/TaskGroup.hh" 28 #include "PTL/TaskGroup.hh" 31 #include "PTL/ThreadData.hh" << 32 #include "PTL/ThreadPool.hh" 29 #include "PTL/ThreadPool.hh" 33 30 34 #include <cassert> 31 #include <cassert> 35 #include <chrono> << 36 #include <functional> << 37 #include <iostream> << 38 #include <map> << 39 #include <stdexcept> << 40 #include <thread> << 41 #include <utility> << 42 32 43 namespace PTL << 33 using namespace PTL; 44 { << 34 45 //============================================ 35 //======================================================================================// 46 36 47 UserTaskQueue::UserTaskQueue(intmax_t nworkers 37 UserTaskQueue::UserTaskQueue(intmax_t nworkers, UserTaskQueue* parent) 48 : VUserTaskQueue(nworkers) 38 : VUserTaskQueue(nworkers) 49 , m_is_clone((parent) != nullptr) << 39 , m_is_clone((parent) ? true : false) 50 , m_thread_bin((parent) ? (ThreadPool::get_thi 40 , m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0) 51 , m_insert_bin((parent) ? (ThreadPool::get_thi 41 , m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0) 52 , m_hold((parent) ? parent->m_hold : new std:: 42 , m_hold((parent) ? parent->m_hold : new std::atomic_bool(false)) 53 , m_ntasks((parent) ? parent->m_ntasks : new s 43 , m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0)) 54 , m_mutex((parent) ? parent->m_mutex : new Mut << 55 , m_subqueues((parent) ? parent->m_subqueues : 44 , m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer()) 56 { 45 { 57 // create nthreads + 1 subqueues so there 46 // create nthreads + 1 subqueues so there is always a subqueue available 58 if(!parent) 47 if(!parent) 59 { 48 { 60 for(intmax_t i = 0; i < nworkers + 1; 49 for(intmax_t i = 0; i < nworkers + 1; ++i) 61 m_subqueues->emplace_back(new Task 50 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks)); 62 } 51 } >> 52 >> 53 #if defined(DEBUG) >> 54 if(GetEnv<int>("PTL_VERBOSE", 0) > 3) >> 55 { >> 56 RecursiveAutoLock l(TypeMutex<decltype(std::cout), RecursiveMutex>()); >> 57 std::stringstream ss; >> 58 ss << ThreadPool::get_this_thread_id() << "> " << ThisThread::get_id() << " [" >> 59 << __FUNCTION__ << ":" << __LINE__ << "] " >> 60 << "this = " << this << ", " >> 61 << "clone = " << std::boolalpha << m_is_clone << ", " >> 62 << "thread = " << m_thread_bin << ", " >> 63 << "insert = " << m_insert_bin << ", " >> 64 << "hold = " << m_hold->load() << " @ " << m_hold << ", " >> 65 << "tasks = " << m_ntasks->load() << " @ " << m_ntasks << ", " >> 66 << "subqueue = " << m_subqueues << ", " >> 67 << "size = " << true_size() << ", " >> 68 << "empty = " << true_empty(); >> 69 std::cout << ss.str() << std::endl; >> 70 } >> 71 #endif 63 } 72 } 64 73 65 //============================================ 74 //======================================================================================// 66 75 67 UserTaskQueue::~UserTaskQueue() 76 UserTaskQueue::~UserTaskQueue() 68 { 77 { 69 if(!m_is_clone) 78 if(!m_is_clone) 70 { 79 { 71 for(auto& itr : *m_subqueues) 80 for(auto& itr : *m_subqueues) 72 { 81 { 73 assert(itr->empty()); 82 assert(itr->empty()); 74 delete itr; 83 delete itr; 75 } 84 } 76 m_subqueues->clear(); 85 m_subqueues->clear(); 77 delete m_hold; 86 delete m_hold; 78 delete m_ntasks; 87 delete m_ntasks; 79 delete m_mutex; << 80 delete m_subqueues; 88 delete m_subqueues; 81 } 89 } 82 } 90 } 83 91 84 //============================================ 92 //======================================================================================// 85 93 86 void 94 void 87 UserTaskQueue::resize(intmax_t n) 95 UserTaskQueue::resize(intmax_t n) 88 { 96 { 89 if(!m_mutex) << 97 AutoLock l(m_mutex); 90 throw std::runtime_error("nullptr to m << 91 AutoLock lk(m_mutex); << 92 if(m_workers < n) 98 if(m_workers < n) 93 { 99 { 94 while(m_workers < n) 100 while(m_workers < n) 95 { 101 { 96 m_subqueues->emplace_back(new Task 102 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks)); 97 ++m_workers; 103 ++m_workers; 98 } 104 } 99 } 105 } 100 else if(m_workers > n) 106 else if(m_workers > n) 101 { 107 { 102 while(m_workers > n) 108 while(m_workers > n) 103 { 109 { 104 delete m_subqueues->back(); 110 delete m_subqueues->back(); 105 m_subqueues->pop_back(); 111 m_subqueues->pop_back(); 106 --m_workers; 112 --m_workers; 107 } 113 } 108 } 114 } 109 } 115 } 110 116 111 //============================================ 117 //======================================================================================// 112 118 113 VUserTaskQueue* 119 VUserTaskQueue* 114 UserTaskQueue::clone() 120 UserTaskQueue::clone() 115 { 121 { 116 return new UserTaskQueue(workers(), this); 122 return new UserTaskQueue(workers(), this); 117 } 123 } 118 //============================================ 124 //======================================================================================// 119 125 120 intmax_t 126 intmax_t 121 UserTaskQueue::GetThreadBin() const 127 UserTaskQueue::GetThreadBin() const 122 { 128 { 123 // get a thread id number 129 // get a thread id number 124 static thread_local intmax_t tl_bin = 130 static thread_local intmax_t tl_bin = 125 (m_thread_bin + ThreadPool::get_this_t 131 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1); 126 return tl_bin; 132 return tl_bin; 127 } 133 } 128 134 129 //============================================ 135 //======================================================================================// 130 136 131 intmax_t 137 intmax_t 132 UserTaskQueue::GetInsertBin() const 138 UserTaskQueue::GetInsertBin() const 133 { 139 { 134 return (++m_insert_bin % (m_workers + 1)); 140 return (++m_insert_bin % (m_workers + 1)); 135 } 141 } 136 142 137 //============================================ 143 //======================================================================================// 138 144 139 UserTaskQueue::task_pointer 145 UserTaskQueue::task_pointer 140 UserTaskQueue::GetThreadBinTask() 146 UserTaskQueue::GetThreadBinTask() 141 { 147 { 142 intmax_t tbin = GetThreadBin(); 148 intmax_t tbin = GetThreadBin(); 143 TaskSubQueue* task_subq = (*m_subqueues)[t 149 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)]; 144 task_pointer _task = nullptr; 150 task_pointer _task = nullptr; 145 151 146 //---------------------------------------- 152 //------------------------------------------------------------------------// 147 auto get_task = [&]() { 153 auto get_task = [&]() { 148 if(task_subq->AcquireClaim()) 154 if(task_subq->AcquireClaim()) 149 { 155 { 150 // run task 156 // run task 151 _task = task_subq->PopTask(true); 157 _task = task_subq->PopTask(true); 152 // release the claim on the bin 158 // release the claim on the bin 153 task_subq->ReleaseClaim(); 159 task_subq->ReleaseClaim(); 154 } 160 } 155 if(_task) 161 if(_task) 156 --(*m_ntasks); 162 --(*m_ntasks); 157 // return success if valid pointer 163 // return success if valid pointer 158 return (_task != nullptr); 164 return (_task != nullptr); 159 }; 165 }; 160 //---------------------------------------- 166 //------------------------------------------------------------------------// 161 167 162 // while not empty 168 // while not empty 163 while(!task_subq->empty()) 169 while(!task_subq->empty()) 164 { 170 { 165 if(get_task()) 171 if(get_task()) 166 break; 172 break; 167 } 173 } 168 return _task; 174 return _task; 169 } 175 } 170 176 171 //============================================ 177 //======================================================================================// 172 178 173 UserTaskQueue::task_pointer 179 UserTaskQueue::task_pointer 174 UserTaskQueue::GetTask(intmax_t subq, intmax_t 180 UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr) 175 { 181 { 176 // exit if empty 182 // exit if empty 177 if(this->true_empty()) 183 if(this->true_empty()) 178 return nullptr; 184 return nullptr; 179 185 180 // ensure the thread has a bin assignment 186 // ensure the thread has a bin assignment 181 intmax_t tbin = GetThreadBin(); 187 intmax_t tbin = GetThreadBin(); 182 intmax_t n = (subq < 0) ? tbin : subq; 188 intmax_t n = (subq < 0) ? tbin : subq; 183 if(nitr < 1) 189 if(nitr < 1) 184 nitr = (m_workers + 1); // * m_ntasks 190 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed); 185 191 186 if(m_hold->load(std::memory_order_relaxed) 192 if(m_hold->load(std::memory_order_relaxed)) 187 { 193 { 188 return GetThreadBinTask(); 194 return GetThreadBinTask(); 189 } 195 } 190 196 191 task_pointer _task = nullptr; 197 task_pointer _task = nullptr; 192 //---------------------------------------- 198 //------------------------------------------------------------------------// 193 auto get_task = [&](intmax_t _n) { 199 auto get_task = [&](intmax_t _n) { 194 TaskSubQueue* task_subq = (*m_subqueue 200 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)]; 195 // try to acquire a claim for the bin 201 // try to acquire a claim for the bin 196 // if acquired, no other threads will 202 // if acquired, no other threads will access bin until claim is released 197 if(!task_subq->empty() && task_subq->A 203 if(!task_subq->empty() && task_subq->AcquireClaim()) 198 { 204 { 199 // pop task out of bin 205 // pop task out of bin 200 _task = task_subq->PopTask(n == tb 206 _task = task_subq->PopTask(n == tbin); 201 // release the claim on the bin 207 // release the claim on the bin 202 task_subq->ReleaseClaim(); 208 task_subq->ReleaseClaim(); 203 } 209 } 204 if(_task) 210 if(_task) 205 --(*m_ntasks); 211 --(*m_ntasks); 206 // return success if valid pointer 212 // return success if valid pointer 207 return (_task != nullptr); 213 return (_task != nullptr); 208 }; 214 }; 209 //---------------------------------------- 215 //------------------------------------------------------------------------// 210 216 211 // there are num_workers+1 bins so there i 217 // there are num_workers+1 bins so there is always a bin that is open 212 // execute num_workers+2 iterations so the 218 // execute num_workers+2 iterations so the thread checks its bin twice 213 // while(!empty()) 219 // while(!empty()) 214 { 220 { 215 for(intmax_t i = 0; i < nitr; ++i, ++n 221 for(intmax_t i = 0; i < nitr; ++i, ++n) 216 { 222 { 217 if(get_task(n % (m_workers + 1))) 223 if(get_task(n % (m_workers + 1))) 218 return _task; 224 return _task; 219 } 225 } 220 } 226 } 221 227 222 // only reached if looped over all bins (a 228 // only reached if looped over all bins (and looked in own bin twice) 223 // and found no work so return an empty ta 229 // and found no work so return an empty task and the thread will be put to 224 // sleep if there is still no work by the 230 // sleep if there is still no work by the time it reaches its 225 // condition variable 231 // condition variable 226 return _task; 232 return _task; 227 } 233 } 228 234 229 //============================================ 235 //======================================================================================// 230 236 231 intmax_t 237 intmax_t 232 UserTaskQueue::InsertTask(task_pointer&& task, 238 UserTaskQueue::InsertTask(task_pointer&& task, ThreadData* data, intmax_t subq) 233 { 239 { 234 // increment number of tasks 240 // increment number of tasks 235 ++(*m_ntasks); 241 ++(*m_ntasks); 236 242 237 bool spin = m_hold->load(std::memory_o 243 bool spin = m_hold->load(std::memory_order_relaxed); 238 intmax_t tbin = GetThreadBin(); 244 intmax_t tbin = GetThreadBin(); 239 245 240 if(data && data->within_task) 246 if(data && data->within_task) 241 { 247 { 242 subq = tbin; 248 subq = tbin; 243 // spin = true; 249 // spin = true; 244 } 250 } 245 251 246 // subq is -1 unless specified so unless s 252 // subq is -1 unless specified so unless specified 247 // GetInsertBin() call increments a counte 253 // GetInsertBin() call increments a counter and returns 248 // counter % (num_workers + 1) so that tas 254 // counter % (num_workers + 1) so that tasks are distributed evenly 249 // among the bins 255 // among the bins 250 intmax_t n = (subq < 0) ? GetInsertBin() : 256 intmax_t n = (subq < 0) ? GetInsertBin() : subq; 251 257 252 //---------------------------------------- 258 //------------------------------------------------------------------------// 253 auto insert_task = [&](intmax_t _n) { 259 auto insert_task = [&](intmax_t _n) { 254 TaskSubQueue* task_subq = (*m_subqueue 260 TaskSubQueue* task_subq = (*m_subqueues)[_n]; 255 // TaskSubQueue* next_subq = (*m_subqu 261 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)]; 256 // if not threads bin and size differe 262 // if not threads bin and size difference, insert into smaller 257 // if(n != tbin && next_subq->size() < 263 // if(n != tbin && next_subq->size() < task_subq->size()) 258 // task_subq = next_subq; 264 // task_subq = next_subq; 259 // try to acquire a claim for the bin 265 // try to acquire a claim for the bin 260 // if acquired, no other threads will 266 // if acquired, no other threads will access bin until claim is released 261 if(task_subq->AcquireClaim()) 267 if(task_subq->AcquireClaim()) 262 { 268 { 263 // push the task into the bin 269 // push the task into the bin 264 task_subq->PushTask(std::move(task 270 task_subq->PushTask(std::move(task)); 265 // release the claim on the bin 271 // release the claim on the bin 266 task_subq->ReleaseClaim(); 272 task_subq->ReleaseClaim(); 267 // return success 273 // return success 268 return true; 274 return true; 269 } 275 } 270 return false; 276 return false; 271 }; 277 }; 272 //---------------------------------------- 278 //------------------------------------------------------------------------// 273 279 274 // if not in "hold/spin mode", where threa 280 // if not in "hold/spin mode", where thread only inserts tasks into 275 // specified bin, then move onto next bin 281 // specified bin, then move onto next bin 276 // 282 // 277 if(spin) 283 if(spin) 278 { 284 { 279 n = n % (m_workers + 1); 285 n = n % (m_workers + 1); 280 while(!insert_task(n)) 286 while(!insert_task(n)) 281 ; 287 ; 282 return n; 288 return n; 283 } 289 } 284 290 285 // there are num_workers+1 bins so there i 291 // there are num_workers+1 bins so there is always a bin that is open 286 // execute num_workers+2 iterations so the 292 // execute num_workers+2 iterations so the thread checks its bin twice 287 while(true) 293 while(true) 288 { 294 { 289 auto _n = (n++) % (m_workers + 1); 295 auto _n = (n++) % (m_workers + 1); 290 if(insert_task(_n)) 296 if(insert_task(_n)) 291 return _n; 297 return _n; 292 } 298 } >> 299 return GetThreadBin(); 293 } 300 } 294 301 295 //============================================ 302 //======================================================================================// 296 303 297 void 304 void 298 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* 305 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* tp, function_type func) 299 { 306 { 300 using task_group_type = TaskGroup<int 307 using task_group_type = TaskGroup<int, int>; 301 using thread_execute_map_t = std::map<int6 308 using thread_execute_map_t = std::map<int64_t, bool>; 302 309 303 if(!tp->is_alive()) 310 if(!tp->is_alive()) 304 { 311 { 305 func(); 312 func(); 306 return; 313 return; 307 } 314 } 308 315 309 task_group_type tg{ [](int& ref, int i) { 316 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp }; 310 317 311 // wait for all threads to finish any work 318 // wait for all threads to finish any work 312 // NOTE: will cause deadlock if called fro 319 // NOTE: will cause deadlock if called from a task 313 while(tp->get_active_threads_count() > 0) 320 while(tp->get_active_threads_count() > 0) 314 ThisThread::sleep_for(std::chrono::mil 321 ThisThread::sleep_for(std::chrono::milliseconds(10)); 315 322 316 thread_execute_map_t thread 323 thread_execute_map_t thread_execute_map{}; 317 std::vector<std::shared_ptr<VTask>> _tasks 324 std::vector<std::shared_ptr<VTask>> _tasks{}; 318 _tasks.reserve(m_workers + 1); 325 _tasks.reserve(m_workers + 1); 319 326 320 AcquireHold(); 327 AcquireHold(); 321 for(int i = 0; i < (m_workers + 1); ++i) 328 for(int i = 0; i < (m_workers + 1); ++i) 322 { 329 { 323 if(i == GetThreadBin()) 330 if(i == GetThreadBin()) 324 continue; 331 continue; 325 332 326 //------------------------------------ 333 //--------------------------------------------------------------------// 327 auto thread_specific_func = [&]() { 334 auto thread_specific_func = [&]() { 328 ScopeDestructor _dtor = tg.get_sco 335 ScopeDestructor _dtor = tg.get_scope_destructor(); 329 static Mutex _mtx; 336 static Mutex _mtx; 330 _mtx.lock(); 337 _mtx.lock(); 331 bool& _executed = thread_execute_m 338 bool& _executed = thread_execute_map[GetThreadBin()]; 332 _mtx.unlock(); 339 _mtx.unlock(); 333 if(!_executed) 340 if(!_executed) 334 { 341 { 335 func(); 342 func(); 336 _executed = true; 343 _executed = true; 337 return 1; 344 return 1; 338 } 345 } 339 return 0; 346 return 0; 340 }; 347 }; 341 //------------------------------------ 348 //--------------------------------------------------------------------// 342 349 343 InsertTask(tg.wrap(thread_specific_fun 350 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i); 344 } 351 } 345 352 346 tp->notify_all(); 353 tp->notify_all(); 347 int nexecuted = tg.join(); 354 int nexecuted = tg.join(); 348 if(nexecuted != m_workers) 355 if(nexecuted != m_workers) 349 { 356 { 350 std::stringstream msg; 357 std::stringstream msg; 351 msg << "Failure executing routine on a 358 msg << "Failure executing routine on all threads! Only " << nexecuted 352 << " threads executed function out 359 << " threads executed function out of " << m_workers << " workers"; 353 std::cerr << msg.str() << std::endl; 360 std::cerr << msg.str() << std::endl; 354 } 361 } 355 ReleaseHold(); 362 ReleaseHold(); 356 } 363 } 357 364 358 //============================================ 365 //======================================================================================// 359 366 360 void 367 void 361 UserTaskQueue::ExecuteOnSpecificThreads(Thread 368 UserTaskQueue::ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool* tp, 362 functi 369 function_type func) 363 { 370 { 364 using task_group_type = TaskGroup<int 371 using task_group_type = TaskGroup<int, int>; 365 using thread_execute_map_t = std::map<int6 372 using thread_execute_map_t = std::map<int64_t, bool>; 366 373 367 task_group_type tg{ [](int& ref, int i) { 374 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp }; 368 375 369 // wait for all threads to finish any work 376 // wait for all threads to finish any work 370 // NOTE: will cause deadlock if called fro 377 // NOTE: will cause deadlock if called from a task 371 while(tp->get_active_threads_count() > 0) 378 while(tp->get_active_threads_count() > 0) 372 ThisThread::sleep_for(std::chrono::mil 379 ThisThread::sleep_for(std::chrono::milliseconds(10)); 373 380 374 if(!tp->is_alive()) 381 if(!tp->is_alive()) 375 { 382 { 376 func(); 383 func(); 377 return; 384 return; 378 } 385 } 379 386 380 thread_execute_map_t thread_execute_map{}; 387 thread_execute_map_t thread_execute_map{}; 381 388 382 //======================================== 389 //========================================================================// 383 // wrap the function so that it will only 390 // wrap the function so that it will only be executed if the thread 384 // has an ID in the set 391 // has an ID in the set 385 auto thread_specific_func = [&]() { 392 auto thread_specific_func = [&]() { 386 ScopeDestructor _dtor = tg.get_scope_d 393 ScopeDestructor _dtor = tg.get_scope_destructor(); 387 static Mutex _mtx; 394 static Mutex _mtx; 388 _mtx.lock(); 395 _mtx.lock(); 389 bool& _executed = thread_execute_map[G 396 bool& _executed = thread_execute_map[GetThreadBin()]; 390 _mtx.unlock(); 397 _mtx.unlock(); 391 if(!_executed && tid_set.count(ThisThr 398 if(!_executed && tid_set.count(ThisThread::get_id()) > 0) 392 { 399 { 393 func(); 400 func(); 394 _executed = true; 401 _executed = true; 395 return 1; 402 return 1; 396 } 403 } 397 return 0; 404 return 0; 398 }; 405 }; 399 //======================================== 406 //========================================================================// 400 407 401 if(tid_set.count(ThisThread::get_id()) > 0 408 if(tid_set.count(ThisThread::get_id()) > 0) 402 func(); 409 func(); 403 410 404 AcquireHold(); 411 AcquireHold(); 405 for(int i = 0; i < (m_workers + 1); ++i) 412 for(int i = 0; i < (m_workers + 1); ++i) 406 { 413 { 407 if(i == GetThreadBin()) 414 if(i == GetThreadBin()) 408 continue; 415 continue; 409 416 410 InsertTask(tg.wrap(thread_specific_fun 417 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i); 411 } 418 } 412 tp->notify_all(); 419 tp->notify_all(); 413 decltype(tid_set.size()) nexecuted = tg.jo 420 decltype(tid_set.size()) nexecuted = tg.join(); 414 if(nexecuted != tid_set.size()) 421 if(nexecuted != tid_set.size()) 415 { 422 { 416 std::stringstream msg; 423 std::stringstream msg; 417 msg << "Failure executing routine on s 424 msg << "Failure executing routine on specific threads! Only " << nexecuted 418 << " threads executed function out 425 << " threads executed function out of " << tid_set.size() << " workers"; 419 std::cerr << msg.str() << std::endl; 426 std::cerr << msg.str() << std::endl; 420 } 427 } 421 ReleaseHold(); 428 ReleaseHold(); 422 } 429 } 423 430 424 //============================================ 431 //======================================================================================// 425 432 426 void 433 void 427 UserTaskQueue::AcquireHold() 434 UserTaskQueue::AcquireHold() 428 { 435 { 429 bool _hold; 436 bool _hold; 430 while(!(_hold = m_hold->load(std::memory_o 437 while(!(_hold = m_hold->load(std::memory_order_relaxed))) 431 { 438 { 432 m_hold->compare_exchange_strong(_hold, 439 m_hold->compare_exchange_strong(_hold, true, std::memory_order_release, 433 std::m 440 std::memory_order_relaxed); 434 } 441 } 435 } 442 } 436 443 437 //============================================ 444 //======================================================================================// 438 445 439 void 446 void 440 UserTaskQueue::ReleaseHold() 447 UserTaskQueue::ReleaseHold() 441 { 448 { 442 bool _hold; 449 bool _hold; 443 while((_hold = m_hold->load(std::memory_or 450 while((_hold = m_hold->load(std::memory_order_relaxed))) 444 { 451 { 445 m_hold->compare_exchange_strong(_hold, 452 m_hold->compare_exchange_strong(_hold, false, std::memory_order_release, 446 std::m 453 std::memory_order_relaxed); 447 } 454 } 448 } 455 } 449 456 450 //============================================ 457 //======================================================================================// 451 << 452 } // namespace PTL << 453 458