Geant4 Cross Reference |
1 // 1 // 2 // MIT License 2 // MIT License 3 // Copyright (c) 2020 Jonathan R. Madsen 3 // Copyright (c) 2020 Jonathan R. Madsen 4 // Permission is hereby granted, free of charg 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentati 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, includ 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distr 7 // to use, copy, modify, merge, publish, distribute, sublicense, and 8 // copies of the Software, and to permit perso 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the followin 9 // furnished to do so, subject to the following conditions: 10 // The above copyright notice and this permiss 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the S 11 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPR 12 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILIT 13 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SH 14 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 15 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARIS 16 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALI 17 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 18 // 18 // 19 // ------------------------------------------- 19 // --------------------------------------------------------------- 20 // Tasking class implementation 20 // Tasking class implementation 21 // Class Description: 21 // Class Description: 22 // ------------------------------------------ 22 // --------------------------------------------------------------- 23 // Author: Jonathan Madsen 23 // Author: Jonathan Madsen 24 // ------------------------------------------ 24 // --------------------------------------------------------------- 25 25 26 #include "PTL/UserTaskQueue.hh" 26 #include "PTL/UserTaskQueue.hh" 27 << 27 #include "PTL/Task.hh" 28 #include "PTL/AutoLock.hh" << 29 #include "PTL/ScopeDestructor.hh" << 30 #include "PTL/TaskGroup.hh" 28 #include "PTL/TaskGroup.hh" 31 #include "PTL/ThreadData.hh" << 32 #include "PTL/ThreadPool.hh" 29 #include "PTL/ThreadPool.hh" 33 30 34 #include <cassert> 31 #include <cassert> 35 #include <chrono> << 36 #include <functional> << 37 #include <iostream> << 38 #include <map> << 39 #include <stdexcept> << 40 #include <thread> << 41 #include <utility> << 42 32 43 namespace PTL << 33 using namespace PTL; 44 { << 34 45 //============================================ 35 //======================================================================================// 46 36 47 UserTaskQueue::UserTaskQueue(intmax_t nworkers 37 UserTaskQueue::UserTaskQueue(intmax_t nworkers, UserTaskQueue* parent) 48 : VUserTaskQueue(nworkers) 38 : VUserTaskQueue(nworkers) 49 , m_is_clone((parent) != nullptr) << 39 , m_is_clone((parent) ? true : false) 50 , m_thread_bin((parent) ? (ThreadPool::get_thi 40 , m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0) 51 , m_insert_bin((parent) ? (ThreadPool::get_thi 41 , m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0) 52 , m_hold((parent) ? parent->m_hold : new std:: 42 , m_hold((parent) ? parent->m_hold : new std::atomic_bool(false)) 53 , m_ntasks((parent) ? parent->m_ntasks : new s 43 , m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0)) 54 , m_mutex((parent) ? parent->m_mutex : new Mut << 55 , m_subqueues((parent) ? parent->m_subqueues : 44 , m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer()) 56 { 45 { 57 // create nthreads + 1 subqueues so there 46 // create nthreads + 1 subqueues so there is always a subqueue available 58 if(!parent) 47 if(!parent) 59 { 48 { 60 for(intmax_t i = 0; i < nworkers + 1; 49 for(intmax_t i = 0; i < nworkers + 1; ++i) 61 m_subqueues->emplace_back(new Task << 50 m_subqueues->push_back(new TaskSubQueue(m_ntasks)); >> 51 } >> 52 >> 53 #if defined(DEBUG) >> 54 if(GetEnv<int>("PTL_VERBOSE", 0) > 3) >> 55 { >> 56 RecursiveAutoLock l(TypeRecursiveMutex<decltype(std::cout)>()); >> 57 std::stringstream ss; >> 58 ss << ThreadPool::get_this_thread_id() << "> " << ThisThread::get_id() << " [" >> 59 << __FUNCTION__ << ":" << __LINE__ << "] " >> 60 << "this = " << this << ", " >> 61 << "clone = " << std::boolalpha << m_is_clone << ", " >> 62 << "thread = " << m_thread_bin << ", " >> 63 << "insert = " << m_insert_bin << ", " >> 64 << "hold = " << m_hold->load() << " @ " << m_hold << ", " >> 65 << "tasks = " << m_ntasks->load() << " @ " << m_ntasks << ", " >> 66 << "subqueue = " << m_subqueues << ", " >> 67 << "size = " << true_size() << ", " >> 68 << "empty = " << true_empty(); >> 69 std::cout << ss.str() << std::endl; 62 } 70 } >> 71 #endif 63 } 72 } 64 73 65 //============================================ 74 //======================================================================================// 66 75 67 UserTaskQueue::~UserTaskQueue() 76 UserTaskQueue::~UserTaskQueue() 68 { 77 { 69 if(!m_is_clone) 78 if(!m_is_clone) 70 { 79 { 71 for(auto& itr : *m_subqueues) 80 for(auto& itr : *m_subqueues) 72 { 81 { 73 assert(itr->empty()); << 82 assert(itr->size() == 0); 74 delete itr; 83 delete itr; 75 } 84 } 76 m_subqueues->clear(); 85 m_subqueues->clear(); 77 delete m_hold; 86 delete m_hold; 78 delete m_ntasks; 87 delete m_ntasks; 79 delete m_mutex; << 80 delete m_subqueues; 88 delete m_subqueues; 81 } 89 } 82 } 90 } 83 91 84 //============================================ 92 //======================================================================================// 85 93 86 void 94 void 87 UserTaskQueue::resize(intmax_t n) 95 UserTaskQueue::resize(intmax_t n) 88 { 96 { 89 if(!m_mutex) << 97 AutoLock l(m_mutex); 90 throw std::runtime_error("nullptr to m << 91 AutoLock lk(m_mutex); << 92 if(m_workers < n) 98 if(m_workers < n) 93 { 99 { 94 while(m_workers < n) 100 while(m_workers < n) 95 { 101 { 96 m_subqueues->emplace_back(new Task << 102 m_subqueues->push_back(new TaskSubQueue(m_ntasks)); 97 ++m_workers; 103 ++m_workers; 98 } 104 } 99 } 105 } 100 else if(m_workers > n) 106 else if(m_workers > n) 101 { 107 { 102 while(m_workers > n) 108 while(m_workers > n) 103 { 109 { 104 delete m_subqueues->back(); 110 delete m_subqueues->back(); 105 m_subqueues->pop_back(); 111 m_subqueues->pop_back(); 106 --m_workers; 112 --m_workers; 107 } 113 } 108 } 114 } 109 } 115 } 110 116 111 //============================================ 117 //======================================================================================// 112 118 113 VUserTaskQueue* 119 VUserTaskQueue* 114 UserTaskQueue::clone() 120 UserTaskQueue::clone() 115 { 121 { 116 return new UserTaskQueue(workers(), this); 122 return new UserTaskQueue(workers(), this); 117 } 123 } 118 //============================================ 124 //======================================================================================// 119 125 120 intmax_t 126 intmax_t 121 UserTaskQueue::GetThreadBin() const 127 UserTaskQueue::GetThreadBin() const 122 { 128 { 123 // get a thread id number 129 // get a thread id number 124 static thread_local intmax_t tl_bin = 130 static thread_local intmax_t tl_bin = 125 (m_thread_bin + ThreadPool::get_this_t 131 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1); 126 return tl_bin; 132 return tl_bin; 127 } 133 } 128 134 129 //============================================ 135 //======================================================================================// 130 136 131 intmax_t 137 intmax_t 132 UserTaskQueue::GetInsertBin() const 138 UserTaskQueue::GetInsertBin() const 133 { 139 { 134 return (++m_insert_bin % (m_workers + 1)); 140 return (++m_insert_bin % (m_workers + 1)); 135 } 141 } 136 142 137 //============================================ 143 //======================================================================================// 138 144 139 UserTaskQueue::task_pointer 145 UserTaskQueue::task_pointer 140 UserTaskQueue::GetThreadBinTask() 146 UserTaskQueue::GetThreadBinTask() 141 { 147 { 142 intmax_t tbin = GetThreadBin(); 148 intmax_t tbin = GetThreadBin(); 143 TaskSubQueue* task_subq = (*m_subqueues)[t 149 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)]; 144 task_pointer _task = nullptr; 150 task_pointer _task = nullptr; 145 151 146 //---------------------------------------- 152 //------------------------------------------------------------------------// 147 auto get_task = [&]() { 153 auto get_task = [&]() { 148 if(task_subq->AcquireClaim()) 154 if(task_subq->AcquireClaim()) 149 { 155 { 150 // run task 156 // run task 151 _task = task_subq->PopTask(true); 157 _task = task_subq->PopTask(true); 152 // release the claim on the bin 158 // release the claim on the bin 153 task_subq->ReleaseClaim(); 159 task_subq->ReleaseClaim(); 154 } 160 } 155 if(_task) 161 if(_task) 156 --(*m_ntasks); 162 --(*m_ntasks); 157 // return success if valid pointer 163 // return success if valid pointer 158 return (_task != nullptr); 164 return (_task != nullptr); 159 }; 165 }; 160 //---------------------------------------- 166 //------------------------------------------------------------------------// 161 167 162 // while not empty 168 // while not empty 163 while(!task_subq->empty()) 169 while(!task_subq->empty()) 164 { 170 { 165 if(get_task()) 171 if(get_task()) 166 break; 172 break; 167 } 173 } 168 return _task; 174 return _task; 169 } 175 } 170 176 171 //============================================ 177 //======================================================================================// 172 178 173 UserTaskQueue::task_pointer 179 UserTaskQueue::task_pointer 174 UserTaskQueue::GetTask(intmax_t subq, intmax_t 180 UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr) 175 { 181 { 176 // exit if empty 182 // exit if empty 177 if(this->true_empty()) 183 if(this->true_empty()) 178 return nullptr; 184 return nullptr; 179 185 180 // ensure the thread has a bin assignment 186 // ensure the thread has a bin assignment 181 intmax_t tbin = GetThreadBin(); 187 intmax_t tbin = GetThreadBin(); 182 intmax_t n = (subq < 0) ? tbin : subq; 188 intmax_t n = (subq < 0) ? tbin : subq; 183 if(nitr < 1) 189 if(nitr < 1) 184 nitr = (m_workers + 1); // * m_ntasks 190 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed); 185 191 186 if(m_hold->load(std::memory_order_relaxed) 192 if(m_hold->load(std::memory_order_relaxed)) 187 { 193 { 188 return GetThreadBinTask(); 194 return GetThreadBinTask(); 189 } 195 } 190 196 191 task_pointer _task = nullptr; 197 task_pointer _task = nullptr; 192 //---------------------------------------- 198 //------------------------------------------------------------------------// 193 auto get_task = [&](intmax_t _n) { 199 auto get_task = [&](intmax_t _n) { 194 TaskSubQueue* task_subq = (*m_subqueue 200 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)]; 195 // try to acquire a claim for the bin 201 // try to acquire a claim for the bin 196 // if acquired, no other threads will 202 // if acquired, no other threads will access bin until claim is released 197 if(!task_subq->empty() && task_subq->A 203 if(!task_subq->empty() && task_subq->AcquireClaim()) 198 { 204 { 199 // pop task out of bin 205 // pop task out of bin 200 _task = task_subq->PopTask(n == tb 206 _task = task_subq->PopTask(n == tbin); 201 // release the claim on the bin 207 // release the claim on the bin 202 task_subq->ReleaseClaim(); 208 task_subq->ReleaseClaim(); 203 } 209 } 204 if(_task) 210 if(_task) 205 --(*m_ntasks); 211 --(*m_ntasks); 206 // return success if valid pointer 212 // return success if valid pointer 207 return (_task != nullptr); 213 return (_task != nullptr); 208 }; 214 }; 209 //---------------------------------------- 215 //------------------------------------------------------------------------// 210 216 211 // there are num_workers+1 bins so there i 217 // there are num_workers+1 bins so there is always a bin that is open 212 // execute num_workers+2 iterations so the 218 // execute num_workers+2 iterations so the thread checks its bin twice 213 // while(!empty()) 219 // while(!empty()) 214 { 220 { 215 for(intmax_t i = 0; i < nitr; ++i, ++n 221 for(intmax_t i = 0; i < nitr; ++i, ++n) 216 { 222 { 217 if(get_task(n % (m_workers + 1))) 223 if(get_task(n % (m_workers + 1))) 218 return _task; 224 return _task; 219 } 225 } 220 } 226 } 221 227 222 // only reached if looped over all bins (a 228 // only reached if looped over all bins (and looked in own bin twice) 223 // and found no work so return an empty ta 229 // and found no work so return an empty task and the thread will be put to 224 // sleep if there is still no work by the 230 // sleep if there is still no work by the time it reaches its 225 // condition variable 231 // condition variable 226 return _task; 232 return _task; 227 } 233 } 228 234 229 //============================================ 235 //======================================================================================// 230 236 231 intmax_t 237 intmax_t 232 UserTaskQueue::InsertTask(task_pointer&& task, << 238 UserTaskQueue::InsertTask(task_pointer task, ThreadData* data, intmax_t subq) 233 { 239 { 234 // increment number of tasks << 240 // skip increment here (handled externally) 235 ++(*m_ntasks); 241 ++(*m_ntasks); 236 242 237 bool spin = m_hold->load(std::memory_o 243 bool spin = m_hold->load(std::memory_order_relaxed); 238 intmax_t tbin = GetThreadBin(); 244 intmax_t tbin = GetThreadBin(); 239 245 240 if(data && data->within_task) 246 if(data && data->within_task) 241 { 247 { 242 subq = tbin; 248 subq = tbin; 243 // spin = true; 249 // spin = true; 244 } 250 } 245 251 246 // subq is -1 unless specified so unless s 252 // subq is -1 unless specified so unless specified 247 // GetInsertBin() call increments a counte 253 // GetInsertBin() call increments a counter and returns 248 // counter % (num_workers + 1) so that tas 254 // counter % (num_workers + 1) so that tasks are distributed evenly 249 // among the bins 255 // among the bins 250 intmax_t n = (subq < 0) ? GetInsertBin() : 256 intmax_t n = (subq < 0) ? GetInsertBin() : subq; 251 257 252 //---------------------------------------- 258 //------------------------------------------------------------------------// 253 auto insert_task = [&](intmax_t _n) { 259 auto insert_task = [&](intmax_t _n) { 254 TaskSubQueue* task_subq = (*m_subqueue 260 TaskSubQueue* task_subq = (*m_subqueues)[_n]; 255 // TaskSubQueue* next_subq = (*m_subqu 261 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)]; 256 // if not threads bin and size differe 262 // if not threads bin and size difference, insert into smaller 257 // if(n != tbin && next_subq->size() < 263 // if(n != tbin && next_subq->size() < task_subq->size()) 258 // task_subq = next_subq; 264 // task_subq = next_subq; 259 // try to acquire a claim for the bin 265 // try to acquire a claim for the bin 260 // if acquired, no other threads will 266 // if acquired, no other threads will access bin until claim is released 261 if(task_subq->AcquireClaim()) 267 if(task_subq->AcquireClaim()) 262 { 268 { 263 // push the task into the bin 269 // push the task into the bin 264 task_subq->PushTask(std::move(task << 270 task_subq->PushTask(task); 265 // release the claim on the bin 271 // release the claim on the bin 266 task_subq->ReleaseClaim(); 272 task_subq->ReleaseClaim(); 267 // return success 273 // return success 268 return true; 274 return true; 269 } 275 } 270 return false; 276 return false; 271 }; 277 }; 272 //---------------------------------------- 278 //------------------------------------------------------------------------// 273 279 274 // if not in "hold/spin mode", where threa 280 // if not in "hold/spin mode", where thread only inserts tasks into 275 // specified bin, then move onto next bin 281 // specified bin, then move onto next bin 276 // 282 // 277 if(spin) 283 if(spin) 278 { 284 { 279 n = n % (m_workers + 1); 285 n = n % (m_workers + 1); 280 while(!insert_task(n)) 286 while(!insert_task(n)) 281 ; 287 ; 282 return n; 288 return n; 283 } 289 } 284 290 285 // there are num_workers+1 bins so there i 291 // there are num_workers+1 bins so there is always a bin that is open 286 // execute num_workers+2 iterations so the 292 // execute num_workers+2 iterations so the thread checks its bin twice 287 while(true) 293 while(true) 288 { 294 { 289 auto _n = (n++) % (m_workers + 1); 295 auto _n = (n++) % (m_workers + 1); 290 if(insert_task(_n)) 296 if(insert_task(_n)) 291 return _n; 297 return _n; 292 } 298 } >> 299 return GetThreadBin(); 293 } 300 } 294 301 295 //============================================ 302 //======================================================================================// 296 303 297 void 304 void 298 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* 305 UserTaskQueue::ExecuteOnAllThreads(ThreadPool* tp, function_type func) 299 { 306 { 300 using task_group_type = TaskGroup<int << 307 typedef TaskGroup<int, int> task_group_type; 301 using thread_execute_map_t = std::map<int6 << 308 typedef std::map<int64_t, bool> thread_execute_map_t; 302 309 303 if(!tp->is_alive()) 310 if(!tp->is_alive()) 304 { 311 { 305 func(); 312 func(); 306 return; 313 return; 307 } 314 } 308 315 309 task_group_type tg{ [](int& ref, int i) { << 316 auto join_func = [=](int& ref, int i) { >> 317 ref += i; >> 318 return ref; >> 319 }; >> 320 task_group_type* tg = new task_group_type(join_func, tp); 310 321 311 // wait for all threads to finish any work 322 // wait for all threads to finish any work 312 // NOTE: will cause deadlock if called fro 323 // NOTE: will cause deadlock if called from a task 313 while(tp->get_active_threads_count() > 0) 324 while(tp->get_active_threads_count() > 0) 314 ThisThread::sleep_for(std::chrono::mil 325 ThisThread::sleep_for(std::chrono::milliseconds(10)); 315 326 316 thread_execute_map_t thread << 327 thread_execute_map_t* thread_execute_map = new thread_execute_map_t(); 317 std::vector<std::shared_ptr<VTask>> _tasks << 318 _tasks.reserve(m_workers + 1); << 319 328 320 AcquireHold(); 329 AcquireHold(); 321 for(int i = 0; i < (m_workers + 1); ++i) 330 for(int i = 0; i < (m_workers + 1); ++i) 322 { 331 { 323 if(i == GetThreadBin()) 332 if(i == GetThreadBin()) 324 continue; 333 continue; 325 334 326 //------------------------------------ 335 //--------------------------------------------------------------------// 327 auto thread_specific_func = [&]() { 336 auto thread_specific_func = [&]() { 328 ScopeDestructor _dtor = tg.get_sco << 337 static Mutex _mtx; 329 static Mutex _mtx; << 330 _mtx.lock(); 338 _mtx.lock(); 331 bool& _executed = thread_execute_m << 339 bool& _executed = (*thread_execute_map)[GetThreadBin()]; 332 _mtx.unlock(); 340 _mtx.unlock(); 333 if(!_executed) 341 if(!_executed) 334 { 342 { 335 func(); 343 func(); 336 _executed = true; 344 _executed = true; 337 return 1; 345 return 1; 338 } 346 } 339 return 0; 347 return 0; 340 }; 348 }; 341 //------------------------------------ 349 //--------------------------------------------------------------------// 342 350 343 InsertTask(tg.wrap(thread_specific_fun << 351 auto _task = tg->wrap(thread_specific_func); >> 352 //++(*m_ntasks); >> 353 // TaskSubQueue* task_subq = (*m_subqueues)[i]; >> 354 // task_subq->PushTask(_task); >> 355 InsertTask(_task, ThreadData::GetInstance(), i); 344 } 356 } 345 357 346 tp->notify_all(); 358 tp->notify_all(); 347 int nexecuted = tg.join(); << 359 int nexecuted = tg->join(); 348 if(nexecuted != m_workers) 360 if(nexecuted != m_workers) 349 { 361 { 350 std::stringstream msg; 362 std::stringstream msg; 351 msg << "Failure executing routine on a 363 msg << "Failure executing routine on all threads! Only " << nexecuted 352 << " threads executed function out << 364 << " threads executed function out of " << m_workers; 353 std::cerr << msg.str() << std::endl; 365 std::cerr << msg.str() << std::endl; >> 366 // Exception("UserTaskQueue::ExecuteOnAllThreads", "TaskQueue0000", >> 367 // JustWarning, msg); 354 } 368 } >> 369 delete thread_execute_map; 355 ReleaseHold(); 370 ReleaseHold(); 356 } 371 } 357 372 358 //============================================ 373 //======================================================================================// 359 374 360 void 375 void 361 UserTaskQueue::ExecuteOnSpecificThreads(Thread 376 UserTaskQueue::ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool* tp, 362 functi 377 function_type func) 363 { 378 { 364 using task_group_type = TaskGroup<int << 379 typedef TaskGroup<int, int> task_group_type; 365 using thread_execute_map_t = std::map<int6 << 380 typedef std::map<int64_t, bool> thread_execute_map_t; 366 381 367 task_group_type tg{ [](int& ref, int i) { << 382 auto join_func = [=](int& ref, int i) { >> 383 ref += i; >> 384 return ref; >> 385 }; >> 386 task_group_type* tg = new task_group_type(join_func, tp); 368 387 369 // wait for all threads to finish any work 388 // wait for all threads to finish any work 370 // NOTE: will cause deadlock if called fro 389 // NOTE: will cause deadlock if called from a task 371 while(tp->get_active_threads_count() > 0) 390 while(tp->get_active_threads_count() > 0) 372 ThisThread::sleep_for(std::chrono::mil 391 ThisThread::sleep_for(std::chrono::milliseconds(10)); 373 392 374 if(!tp->is_alive()) 393 if(!tp->is_alive()) 375 { 394 { 376 func(); 395 func(); 377 return; 396 return; 378 } 397 } 379 398 380 thread_execute_map_t thread_execute_map{}; << 399 thread_execute_map_t* thread_execute_map = new thread_execute_map_t(); 381 400 382 //======================================== 401 //========================================================================// 383 // wrap the function so that it will only 402 // wrap the function so that it will only be executed if the thread 384 // has an ID in the set 403 // has an ID in the set 385 auto thread_specific_func = [&]() { << 404 auto thread_specific_func = [=]() { 386 ScopeDestructor _dtor = tg.get_scope_d << 405 static Mutex _mtx; 387 static Mutex _mtx; << 388 _mtx.lock(); 406 _mtx.lock(); 389 bool& _executed = thread_execute_map[G << 407 bool& _executed = (*thread_execute_map)[GetThreadBin()]; 390 _mtx.unlock(); 408 _mtx.unlock(); 391 if(!_executed && tid_set.count(ThisThr 409 if(!_executed && tid_set.count(ThisThread::get_id()) > 0) 392 { 410 { 393 func(); 411 func(); 394 _executed = true; 412 _executed = true; 395 return 1; 413 return 1; 396 } 414 } 397 return 0; 415 return 0; 398 }; 416 }; 399 //======================================== 417 //========================================================================// 400 418 401 if(tid_set.count(ThisThread::get_id()) > 0 419 if(tid_set.count(ThisThread::get_id()) > 0) 402 func(); 420 func(); 403 421 404 AcquireHold(); 422 AcquireHold(); 405 for(int i = 0; i < (m_workers + 1); ++i) 423 for(int i = 0; i < (m_workers + 1); ++i) 406 { 424 { 407 if(i == GetThreadBin()) 425 if(i == GetThreadBin()) 408 continue; 426 continue; 409 427 410 InsertTask(tg.wrap(thread_specific_fun << 428 auto _task = tg->wrap(thread_specific_func); >> 429 InsertTask(_task, ThreadData::GetInstance(), i); 411 } 430 } 412 tp->notify_all(); 431 tp->notify_all(); 413 decltype(tid_set.size()) nexecuted = tg.jo << 432 int nexecuted = tg->join(); 414 if(nexecuted != tid_set.size()) << 433 if(nexecuted != m_workers) 415 { 434 { 416 std::stringstream msg; 435 std::stringstream msg; 417 msg << "Failure executing routine on s << 436 msg << "Failure executing routine on all threads! Only " << nexecuted 418 << " threads executed function out << 437 << " threads executed function out of " << tid_set.size(); 419 std::cerr << msg.str() << std::endl; 438 std::cerr << msg.str() << std::endl; >> 439 // Exception("UserTaskQueue::ExecuteOnSpecificThreads", "TaskQueue0001", >> 440 // JustWarning, msg); 420 } 441 } >> 442 delete thread_execute_map; 421 ReleaseHold(); 443 ReleaseHold(); 422 } 444 } 423 445 424 //============================================ 446 //======================================================================================// 425 447 426 void 448 void 427 UserTaskQueue::AcquireHold() 449 UserTaskQueue::AcquireHold() 428 { 450 { 429 bool _hold; 451 bool _hold; 430 while(!(_hold = m_hold->load(std::memory_o 452 while(!(_hold = m_hold->load(std::memory_order_relaxed))) 431 { 453 { 432 m_hold->compare_exchange_strong(_hold, 454 m_hold->compare_exchange_strong(_hold, true, std::memory_order_release, 433 std::m 455 std::memory_order_relaxed); 434 } 456 } 435 } 457 } 436 458 437 //============================================ 459 //======================================================================================// 438 460 439 void 461 void 440 UserTaskQueue::ReleaseHold() 462 UserTaskQueue::ReleaseHold() 441 { 463 { 442 bool _hold; 464 bool _hold; 443 while((_hold = m_hold->load(std::memory_or 465 while((_hold = m_hold->load(std::memory_order_relaxed))) 444 { 466 { 445 m_hold->compare_exchange_strong(_hold, 467 m_hold->compare_exchange_strong(_hold, false, std::memory_order_release, 446 std::m 468 std::memory_order_relaxed); 447 } 469 } 448 } 470 } 449 471 450 //============================================ 472 //======================================================================================// 451 << 452 } // namespace PTL << 453 473