OpenVDB  12.0.0
Queue.h
Go to the documentation of this file.
1 // Copyright Contributors to the OpenVDB Project
2 // SPDX-License-Identifier: Apache-2.0
3 
4 /// @file Queue.h
5 /// @author Peter Cucka
6 
7 #ifndef OPENVDB_IO_QUEUE_HAS_BEEN_INCLUDED
8 #define OPENVDB_IO_QUEUE_HAS_BEEN_INCLUDED
9 
10 #include <openvdb/Types.h>
11 #include <openvdb/Grid.h>
12 #include <algorithm> // for std::copy
13 #include <functional>
14 #include <iterator> // for std::back_inserter
15 #include <memory>
16 
17 
18 namespace openvdb {
20 namespace OPENVDB_VERSION_NAME {
21 namespace io {
22 
23 class Archive;
24 
25 /// @brief Queue for asynchronous output of grids to files or streams
26 ///
27 /// @warning The queue holds shared pointers to grids. It is not safe
28 /// to modify a grid that has been placed in the queue. Instead,
29 /// make a deep copy of the grid (Grid::deepCopy()).
30 ///
31 /// @par Example:
32 /// @code
33 /// #include <openvdb/openvdb.h>
34 /// #include <openvdb/io/Queue.h>
35 /// #include <tbb/concurrent_hash_map.h>
36 /// #include <functional>
37 ///
38 /// using openvdb::io::Queue;
39 ///
40 /// struct MyNotifier
41 /// {
42 /// // Use a concurrent container, because queue callback functions
43 /// // must be thread-safe.
44 /// using FilenameMap = tbb::concurrent_hash_map<Queue::Id, std::string>;
45 /// FilenameMap filenames;
46 ///
47 /// // Callback function that prints the status of a completed task.
48 /// void callback(Queue::Id id, Queue::Status status)
49 /// {
50 /// const bool ok = (status == Queue::SUCCEEDED);
51 /// FilenameMap::accessor acc;
52 /// if (filenames.find(acc, id)) {
53 /// std::cout << (ok ? "wrote " : "failed to write ")
54 /// << acc->second << std::endl;
55 /// filenames.erase(acc);
56 /// }
57 /// }
58 /// };
59 ///
60 /// int main()
61 /// {
62 /// // Construct an object to receive notifications from the queue.
63 /// // The object's lifetime must exceed the queue's.
64 /// MyNotifier notifier;
65 ///
66 /// Queue queue;
67 ///
68 /// // Register the callback() method of the MyNotifier object
69 /// // to receive notifications of completed tasks.
70 /// queue.addNotifier(std::bind(&MyNotifier::callback, &notifier,
71 /// std::placeholders::_1, std::placeholders::_2));
72 ///
73 /// // Queue grids for output (e.g., for each step of a simulation).
74 /// for (int step = 1; step <= 10; ++step) {
75 /// openvdb::FloatGrid::Ptr grid = ...;
76 ///
77 /// std::ostringstream os;
78 /// os << "mygrid." << step << ".vdb";
79 /// const std::string filename = os.str();
80 ///
81 /// Queue::Id id = queue.writeGrid(grid, openvdb::io::File(filename));
82 ///
83 /// // Associate the filename with the ID of the queued task.
84 /// MyNotifier::FilenameMap::accessor acc;
85 /// notifier.filenames.insert(acc, id);
86 /// acc->second = filename;
87 /// }
88 /// }
89 /// @endcode
90 /// Output:
91 /// @code
92 /// wrote mygrid.1.vdb
93 /// wrote mygrid.2.vdb
94 /// wrote mygrid.4.vdb
95 /// wrote mygrid.3.vdb
96 /// ...
97 /// wrote mygrid.10.vdb
98 /// @endcode
99 /// Note that tasks do not necessarily complete in the order in which they were queued.
101 {
102 public:
103  /// Default maximum queue length (see setCapacity())
104  static const Index32 DEFAULT_CAPACITY = 100;
105  /// @brief Default maximum time in seconds to wait to queue a task
106  /// when the queue is full (see setTimeout())
107  static const Index32 DEFAULT_TIMEOUT = 120; // seconds
108 
109  /// ID number of a queued task or of a registered notification callback
110  using Id = Index32;
111 
112  /// Status of a queued task
113  enum Status { UNKNOWN, PENDING, SUCCEEDED, FAILED };
114 
115 
116  /// Construct a queue with the given capacity.
117  explicit Queue(Index32 capacity = DEFAULT_CAPACITY);
118  /// Block until all queued tasks complete (successfully or unsuccessfully).
119  ~Queue();
120 
121  /// @brief Return @c true if the queue is empty.
122  bool empty() const;
123  /// @brief Return the number of tasks currently in the queue.
124  Index32 size() const;
125 
126  /// @brief Return the maximum number of tasks allowed in the queue.
127  /// @details Once the queue has reached its maximum size, adding
128  /// a new task will block until an existing task has executed.
129  Index32 capacity() const;
130  /// Set the maximum number of tasks allowed in the queue.
131  void setCapacity(Index32);
132 
133  /// Return the maximum number of seconds to wait to queue a task when the queue is full.
134  Index32 timeout() const;
135  /// Set the maximum number of seconds to wait to queue a task when the queue is full.
136  void setTimeout(Index32 seconds = DEFAULT_TIMEOUT);
137 
138  /// @brief Return the status of the task with the given ID.
139  /// @note Querying the status of a task that has already completed
140  /// (whether successfully or not) removes the task from the status registry.
141  /// Subsequent queries of its status will return UNKNOWN.
142  Status status(Id) const;
143 
144  using Notifier = std::function<void (Id, Status)>;
145  /// @brief Register a function that will be called with a task's ID
146  /// and status when that task completes, whether successfully or not.
147  /// @return an ID that can be passed to removeNotifier() to deregister the function
148  /// @details When multiple notifiers are registered, they are called
149  /// in the order in which they were registered.
150  /// @warning Notifiers are called from worker threads, so they must be thread-safe
151  /// and their lifetimes must exceed that of the queue. They must also not call,
152  /// directly or indirectly, addNotifier(), removeNotifier() or clearNotifiers(),
153  /// as that can result in a deadlock.
154  Id addNotifier(Notifier);
155  /// Deregister the notifier with the given ID.
156  void removeNotifier(Id);
157  /// Deregister all notifiers.
158  void clearNotifiers();
159 
160  /// @brief Queue a single grid for output to a file or stream.
161  /// @param grid the grid to be serialized
162  /// @param archive the io::File or io::Stream to which to output the grid
163  /// @param fileMetadata optional file-level metadata
164  /// @return an ID with which the status of the queued task can be queried
165  /// @throw RuntimeError if the task cannot be queued within the time limit
166  /// (see setTimeout()) because the queue is full
167  /// @par Example:
168  /// @code
169  /// openvdb::FloatGrid::Ptr grid = ...;
170  ///
171  /// openvdb::io::Queue queue;
172  ///
173  /// // Write the grid to the file mygrid.vdb.
174  /// queue.writeGrid(grid, openvdb::io::File("mygrid.vdb"));
175  ///
176  /// // Stream the grid to a binary string.
177  /// std::ostringstream ostr(std::ios_base::binary);
178  /// queue.writeGrid(grid, openvdb::io::Stream(ostr));
179  /// @endcode
180  Id writeGrid(GridBase::ConstPtr grid, const Archive& archive,
181  const MetaMap& fileMetadata = MetaMap());
182 
183  /// @brief Queue a container of grids for output to a file.
184  /// @param grids any iterable container of grid pointers
185  /// (e.g., a GridPtrVec or GridPtrSet)
186  /// @param archive the io::File or io::Stream to which to output the grids
187  /// @param fileMetadata optional file-level metadata
188  /// @return an ID with which the status of the queued task can be queried
189  /// @throw RuntimeError if the task cannot be queued within the time limit
190  /// (see setTimeout()) because the queue is full
191  /// @par Example:
192  /// @code
193  /// openvdb::FloatGrid::Ptr floatGrid = ...;
194  /// openvdb::BoolGrid::Ptr boolGrid = ...;
195  /// openvdb::GridPtrVec grids;
196  /// grids.push_back(floatGrid);
197  /// grids.push_back(boolGrid);
198  ///
199  /// openvdb::io::Queue queue;
200  ///
201  /// // Write the grids to the file mygrid.vdb.
202  /// queue.write(grids, openvdb::io::File("mygrid.vdb"));
203  ///
204  /// // Stream the grids to a (binary) string.
205  /// std::ostringstream ostr(std::ios_base::binary);
206  /// queue.write(grids, openvdb::io::Stream(ostr));
207  /// @endcode
208  template<typename GridPtrContainer>
209  Id write(const GridPtrContainer& grids, const Archive& archive,
210  const MetaMap& fileMetadata = MetaMap());
211 
212 private:
213  // Disallow copying of instances of this class.
214  Queue(const Queue&);
215  Queue& operator=(const Queue&);
216 
217  Id writeGridVec(const GridCPtrVec&, const Archive&, const MetaMap&);
218 
219  struct Impl;
220  std::unique_ptr<Impl> mImpl;
221 }; // class Queue
222 
223 
224 template<typename GridPtrContainer>
225 inline Queue::Id
226 Queue::write(const GridPtrContainer& container,
227  const Archive& archive, const MetaMap& metadata)
228 {
229  GridCPtrVec grids;
230  std::copy(container.begin(), container.end(), std::back_inserter(grids));
231  return this->writeGridVec(grids, archive, metadata);
232 }
233 
234 // Specialization for vectors of const Grid pointers; no copying necessary
235 template<>
236 inline Queue::Id
237 Queue::write<GridCPtrVec>(const GridCPtrVec& grids,
238  const Archive& archive, const MetaMap& metadata)
239 {
240  return this->writeGridVec(grids, archive, metadata);
241 }
242 
243 } // namespace io
244 } // namespace OPENVDB_VERSION_NAME
245 } // namespace openvdb
246 
247 #endif // OPENVDB_IO_QUEUE_HAS_BEEN_INCLUDED
#define OPENVDB_API
Definition: Platform.h:268
Definition: Queue.h:113
std::vector< GridBase::ConstPtr > GridCPtrVec
Definition: Grid.h:513
Queue for asynchronous output of grids to files or streams.
Definition: Queue.h:100
Index32 Id
ID number of a queued task or of a registered notification callback.
Definition: Queue.h:110
std::function< void(Id, Status)> Notifier
Definition: Queue.h:144
Grid serializer/unserializer.
Definition: Archive.h:31
Container that maps names (strings) to values of arbitrary types.
Definition: MetaMap.h:19
bool empty(const char *str)
tests if a c-string str is empty, that is its first value is &#39;\0&#39;
Definition: Util.h:144
Definition: Exceptions.h:13
SharedPtr< const GridBase > ConstPtr
Definition: Grid.h:81
Status
Status of a queued task.
Definition: Queue.h:113
uint32_t Index32
Definition: Types.h:52
#define OPENVDB_VERSION_NAME
The version namespace name for this library version.
Definition: version.h.in:121
#define OPENVDB_USE_VERSION_NAMESPACE
Definition: version.h.in:218