VTK  9.3.1
vtkThreadedCallbackQueue.h
Go to the documentation of this file.
1 // SPDX-FileCopyrightText: Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
2 // SPDX-License-Identifier: BSD-3-Clause
23 #ifndef vtkThreadedCallbackQueue_h
24 #define vtkThreadedCallbackQueue_h
25 
26 #include "vtkObject.h"
27 #include "vtkParallelCoreModule.h" // For export macro
28 #include "vtkSmartPointer.h" // For vtkSmartPointer
29 
30 #include <atomic> // For atomic_bool
31 #include <condition_variable> // For condition variable
32 #include <deque> // For deque
33 #include <functional> // For greater
34 #include <memory> // For unique_ptr
35 #include <mutex> // For mutex
36 #include <thread> // For thread
37 #include <unordered_map> // For unordered_map
38 #include <unordered_set> // For unordered_set
39 #include <vector> // For vector
40 
41 #if !defined(__WRAP__)
42 
43 VTK_ABI_NAMESPACE_BEGIN
44 
45 class VTKPARALLELCORE_EXPORT vtkThreadedCallbackQueue : public vtkObject
46 {
47 private:
51  template <class FT>
52  struct Signature;
53 
58  template <class T, class DummyT = std::nullptr_t>
59  struct Dereference
60  {
61  struct Type;
62  };
63 
67  template <class T>
68  using DereferencedType = typename std::decay<typename Dereference<T>::Type>::type;
69 
73  template <class FT>
74  using InvokeResult = typename Signature<DereferencedType<FT>>::InvokeResult;
75 
80  class ReturnValueWrapper
81  {
82  class ReturnLValueRef;
83  class ReturnConstLValueRef;
84  };
85 
86 public:
87  static vtkThreadedCallbackQueue* New();
89  void PrintSelf(ostream& os, vtkIndent indent) override;
90 
92 
96  ~vtkThreadedCallbackQueue() override;
97 
103  {
104  public:
105  vtkBaseTypeMacro(vtkSharedFutureBase, vtkObjectBase);
106 
108  : NumberOfPriorSharedFuturesRemaining(0)
109  , Status(CONSTRUCTING)
110  {
111  }
112 
116  virtual void Wait() const
117  {
118  if (this->Status == READY)
119  {
120  return;
121  }
122  std::unique_lock<std::mutex> lock(this->Mutex);
123  if (this->Status != READY)
124  {
125  this->ConditionVariable.wait(lock, [this] { return this->Status == READY; });
126  }
127  }
128 
130 
131  private:
135  virtual void operator()() = 0;
136 
140  std::atomic_int NumberOfPriorSharedFuturesRemaining;
141 
147  std::atomic_int Status;
148 
154  vtkIdType InvokerIndex;
155 
160  bool IsHighPriority = false;
161 
166  std::vector<vtkSmartPointer<vtkSharedFutureBase>> Dependents;
167 
168  mutable std::mutex Mutex;
169  mutable std::condition_variable ConditionVariable;
170 
171  vtkSharedFutureBase(const vtkSharedFutureBase& other) = delete;
172  void operator=(const vtkSharedFutureBase& other) = delete;
173  };
174 
178  template <class ReturnT>
180  {
181  public:
182  vtkAbstractTypeMacro(vtkSharedFuture<ReturnT>, vtkSharedFutureBase);
183 
184  using ReturnLValueRef = typename ReturnValueWrapper<ReturnT>::ReturnLValueRef;
185  using ReturnConstLValueRef = typename ReturnValueWrapper<ReturnT>::ReturnConstLValueRef;
186 
187  vtkSharedFuture() = default;
188 
193  ReturnLValueRef Get();
194 
199  ReturnConstLValueRef Get() const;
200 
202 
203  private:
204  ReturnValueWrapper<ReturnT> ReturnValue;
205 
206  vtkSharedFuture(const vtkSharedFuture<ReturnT>& other) = delete;
207  void operator=(const vtkSharedFuture<ReturnT>& other) = delete;
208  };
209 
211  template <class ReturnT>
213 
275  template <class FT, class... ArgsT>
276  SharedFuturePointer<InvokeResult<FT>> Push(FT&& f, ArgsT&&... args);
277 
286  template <class SharedFutureContainerT, class FT, class... ArgsT>
288  SharedFutureContainerT&& priorSharedFutures, FT&& f, ArgsT&&... args);
289 
304  template <class SharedFutureContainerT>
305  void Wait(SharedFutureContainerT&& priorSharedFuture);
306 
308 
316  template <class ReturnT>
318  template <class ReturnT>
320  const SharedFuturePointer<ReturnT>& future);
322 
331  void SetNumberOfThreads(int numberOfThreads);
332 
340  int GetNumberOfThreads() const { return this->NumberOfThreads; }
341 
342 private:
344 
348  template <class FT, class... ArgsT>
349  class vtkInvoker;
351 
352  struct InvokerImpl;
353 
354  template <class FT, class... ArgsT>
355  using InvokerPointer = vtkSmartPointer<vtkInvoker<FT, ArgsT...>>;
356 
357  class ThreadWorker;
358 
359  friend class ThreadWorker;
360 
366  enum Status
367  {
374  CONSTRUCTING = 0x00,
375 
379  ON_HOLD = 0x01,
380 
385  ENQUEUED = 0x02,
386 
390  RUNNING = 0x04,
391 
395  READY = 0x08
396  };
397 
404  void Sync(int startId = 0);
405 
410  void PopFrontNullptr();
411 
417  void SignalDependentSharedFutures(vtkSharedFutureBase* invoker);
418 
424  template <class SharedFutureContainerT, class InvokerT>
425  void HandleDependentInvoker(SharedFutureContainerT&& priorSharedFutures, InvokerT&& invoker);
426 
431  void Invoke(vtkSharedFutureBase* invoker, std::unique_lock<std::mutex>& lock);
432 
437  bool TryInvoke(vtkSharedFutureBase* invoker);
438 
443  template <class FT, class... ArgsT>
444  void PushControl(FT&& f, ArgsT&&... args);
445 
449  template <class SharedFutureContainerT>
450  static bool MustWait(SharedFutureContainerT&& priorSharedFutures);
451 
455  std::deque<SharedFutureBasePointer> InvokerQueue;
456 
460  std::mutex Mutex;
461 
465  std::mutex ControlMutex;
466 
471  std::mutex DestroyMutex;
472 
476  std::mutex ThreadIdToIndexMutex;
477 
478  std::condition_variable ConditionVariable;
479 
484  std::atomic_bool Destroying{ false };
485 
489  std::atomic_int NumberOfThreads;
490 
491  std::vector<std::thread> Threads;
492 
500  std::unordered_map<std::thread::id, std::shared_ptr<std::atomic_int>> ThreadIdToIndex;
501 
506  std::unordered_set<SharedFutureBasePointer> ControlFutures;
507 
509  void operator=(const vtkThreadedCallbackQueue&) = delete;
510 };
511 
512 VTK_ABI_NAMESPACE_END
513 
514 #include "vtkThreadedCallbackQueue.txx"
515 
516 #endif
517 #endif
518 // VTK-HeaderTest-Exclude: vtkThreadedCallbackQueue.h
abstract base class for most VTK objects
Definition: vtkObject.h:51
void PrintSelf(ostream &os, vtkIndent indent) override
Methods invoked by print to print information about the object including superclasses.
typename ReturnValueWrapper< ReturnT >::ReturnLValueRef ReturnLValueRef
Hold a reference to a vtkObjectBase instance.
Definition: vtkMeta.h:23
int vtkIdType
Definition: vtkType.h:315
vtkSharedFutureBase is the base block to store, run, get the returned value of the tasks that are pus...
vtkSharedFuture< ReturnT >::ReturnLValueRef Get(SharedFuturePointer< ReturnT > &future)
Get the returned value from the task associated with the input future.
A vtkSharedFuture is an object returned by the methods Push and PushDependent.
void Wait(SharedFutureContainerT &&priorSharedFuture)
This method blocks the current thread until all the tasks associated with each shared future inside p...
a simple class to control print indentation
Definition: vtkIndent.h:28
int GetNumberOfThreads() const
Returns the number of allocated threads.
SharedFuturePointer< InvokeResult< FT > > Push(FT &&f, ArgsT &&...args)
Pushes a function f to be passed args...
abstract base class for most VTK objects
Definition: vtkObjectBase.h:62
simple threaded callback queue
void SetNumberOfThreads(int numberOfThreads)
Sets the number of threads.
typename ReturnValueWrapper< ReturnT >::ReturnConstLValueRef ReturnConstLValueRef
SharedFuturePointer< InvokeResult< FT > > PushDependent(SharedFutureContainerT &&priorSharedFutures, FT &&f, ArgsT &&...args)
This method behaves the same way Push does, with the addition of a container of futures.
virtual void Wait() const
Blocks current thread until the task associated with this future has terminated.
static vtkObject * New()
Create an object with Debug turned off, modified time initialized to zero, and reference counting on...