Line | Branch | Exec | Source |
---|---|---|---|
1 | // Copyright Contributors to the OpenVDB Project | ||
2 | // SPDX-License-Identifier: MPL-2.0 | ||
3 | |||
4 | /// @file Queue.cc | ||
5 | /// @author Peter Cucka | ||
6 | |||
7 | #include "Queue.h" | ||
8 | #include "File.h" | ||
9 | #include "Stream.h" | ||
10 | #include "openvdb/Exceptions.h" | ||
11 | #include "openvdb/util/logging.h" | ||
12 | |||
13 | #include <tbb/concurrent_hash_map.h> | ||
14 | #include <tbb/task_arena.h> | ||
15 | |||
16 | #include <thread> | ||
17 | #include <algorithm> // for std::max() | ||
18 | #include <atomic> | ||
19 | #include <iostream> | ||
20 | #include <map> | ||
21 | #include <mutex> | ||
22 | #include <chrono> | ||
23 | |||
24 | |||
25 | namespace openvdb { | ||
26 | OPENVDB_USE_VERSION_NAMESPACE | ||
27 | namespace OPENVDB_VERSION_NAME { | ||
28 | namespace io { | ||
29 | |||
30 | namespace { | ||
31 | |||
32 | // Abstract base class for queuable TBB tasks that adds a task completion callback | ||
33 | class Task | ||
34 | { | ||
35 | public: | ||
36 | 20 | Task(Queue::Id id): mId(id) {} | |
37 |
2/4✓ Branch 0 taken 57 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
58 | virtual ~Task() {} |
38 | |||
39 | 38 | Queue::Id id() const { return mId; } | |
40 | |||
41 | 19 | void setNotifier(Queue::Notifier& notifier) { mNotify = notifier; } | |
42 | virtual void execute() const = 0; | ||
43 | |||
44 | protected: | ||
45 |
1/2✓ Branch 0 taken 19 times.
✗ Branch 1 not taken.
|
19 | void notify(Queue::Status status) const { if (mNotify) mNotify(this->id(), status); } |
46 | |||
47 | private: | ||
48 | Queue::Id mId; | ||
49 | Queue::Notifier mNotify; | ||
50 | }; | ||
51 | |||
52 | |||
53 | // Queuable TBB task that writes one or more grids to a .vdb file or an output stream | ||
54 | class OutputTask : public Task | ||
55 | { | ||
56 | public: | ||
57 | 20 | OutputTask(Queue::Id id, const GridCPtrVec& grids, const Archive& archive, | |
58 | const MetaMap& metadata) | ||
59 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | : Task(id) |
60 | , mGrids(grids) | ||
61 | 20 | , mArchive(archive.copy()) | |
62 |
3/6✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 20 times.
✗ Branch 8 not taken.
|
20 | , mMetadata(metadata) {} |
63 | 232 | ~OutputTask() override {} | |
64 | |||
65 |
1/2✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
|
19 | void execute() const override |
66 | { | ||
67 | Queue::Status status = Queue::FAILED; | ||
68 | try { | ||
69 |
1/2✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
|
19 | mArchive->write(mGrids, mMetadata); |
70 | status = Queue::SUCCEEDED; | ||
71 | ✗ | } catch (std::exception& e) { | |
72 | ✗ | if (const char* msg = e.what()) { | |
73 | ✗ | OPENVDB_LOG_ERROR(msg); | |
74 | } | ||
75 | ✗ | } catch (...) {} | |
76 | this->notify(status); | ||
77 | 19 | } | |
78 | |||
79 | private: | ||
80 | GridCPtrVec mGrids; | ||
81 | SharedPtr<Archive> mArchive; | ||
82 | MetaMap mMetadata; | ||
83 | }; | ||
84 | |||
85 | } // unnamed namespace | ||
86 | |||
87 | |||
88 | //////////////////////////////////////// | ||
89 | |||
90 | |||
91 | // Private implementation details of a Queue | ||
92 | struct Queue::Impl | ||
93 | { | ||
94 | using NotifierMap = std::map<Queue::Id, Queue::Notifier>; | ||
95 | /// @todo Provide more information than just "succeeded" or "failed"? | ||
96 | using StatusMap = tbb::concurrent_hash_map<Queue::Id, Queue::Status>; | ||
97 | |||
98 | 3 | Impl() | |
99 | 3 | : mTimeout(Queue::DEFAULT_TIMEOUT) | |
100 | , mCapacity(Queue::DEFAULT_CAPACITY) | ||
101 | , mNextId(1) | ||
102 | 3 | , mNextNotifierId(1) | |
103 | { | ||
104 | mNumTasks = 0; // note: must explicitly zero-initialize atomics | ||
105 | 3 | } | |
106 | 3 | ~Impl() {} | |
107 | |||
108 | // Disallow copying of instances of this class. | ||
109 | Impl(const Impl&); | ||
110 | Impl& operator=(const Impl&); | ||
111 | |||
112 | // This method might be called from multiple threads. | ||
113 |
1/2✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
|
38 | void setStatus(Queue::Id id, Queue::Status status) |
114 | { | ||
115 | StatusMap::accessor acc; | ||
116 |
1/2✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
|
38 | mStatus.insert(acc, id); |
117 |
1/2✓ Branch 0 taken 38 times.
✗ Branch 1 not taken.
|
38 | acc->second = status; |
118 | 38 | } | |
119 | |||
120 | // This method might be called from multiple threads. | ||
121 | 19 | void setStatusWithNotification(Queue::Id id, Queue::Status status) | |
122 | { | ||
123 | 19 | const bool completed = (status == SUCCEEDED || status == FAILED); | |
124 | |||
125 | // Update the task's entry in the status map with the new status. | ||
126 | 19 | this->setStatus(id, status); | |
127 | |||
128 | // If the client registered any callbacks, call them now. | ||
129 | bool didNotify = false; | ||
130 | { | ||
131 | // tbb::concurrent_hash_map does not support concurrent iteration | ||
132 | // (i.e., iteration concurrent with insertion or deletion), | ||
133 | // so we use a mutex-protected STL map instead. But if a callback | ||
134 | // invokes a notifier method such as removeNotifier() on this queue, | ||
135 | // the result will be a deadlock. | ||
136 | /// @todo Is it worth trying to avoid such deadlocks? | ||
137 | 19 | std::lock_guard<std::mutex> lock(mNotifierMutex); | |
138 |
2/2✓ Branch 0 taken 9 times.
✓ Branch 1 taken 10 times.
|
19 | if (!mNotifiers.empty()) { |
139 | didNotify = true; | ||
140 | for (NotifierMap::const_iterator it = mNotifiers.begin(); | ||
141 |
2/2✓ Branch 0 taken 9 times.
✓ Branch 1 taken 9 times.
|
18 | it != mNotifiers.end(); ++it) |
142 | { | ||
143 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | it->second(id, status); |
144 | } | ||
145 | } | ||
146 | } | ||
147 | // If the task completed and callbacks were called, remove | ||
148 | // the task's entry from the status map. | ||
149 |
1/2✓ Branch 0 taken 19 times.
✗ Branch 1 not taken.
|
19 | if (completed) { |
150 |
2/2✓ Branch 0 taken 9 times.
✓ Branch 1 taken 10 times.
|
19 | if (didNotify) { |
151 | StatusMap::accessor acc; | ||
152 |
2/4✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
✗ Branch 4 not taken.
|
9 | if (mStatus.find(acc, id)) { |
153 | mStatus.erase(acc); | ||
154 | } | ||
155 | } | ||
156 | --mNumTasks; | ||
157 | } | ||
158 | 19 | } | |
159 | |||
160 | 24 | bool canEnqueue() const { return mNumTasks < Int64(mCapacity); } | |
161 | |||
162 | 20 | void enqueue(OutputTask& task) | |
163 | { | ||
164 | 20 | auto start = std::chrono::steady_clock::now(); | |
165 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 19 times.
|
24 | while (!canEnqueue()) { |
166 | 5 | std::this_thread::sleep_for(/*0.5s*/std::chrono::milliseconds(500)); | |
167 | auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( | ||
168 | 5 | std::chrono::steady_clock::now() - start); | |
169 | 5 | const double seconds = double(duration.count()) / 1000.0; | |
170 |
2/2✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 times.
|
5 | if (seconds > double(mTimeout)) { |
171 |
3/8✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
|
5 | OPENVDB_THROW(RuntimeError, |
172 | "unable to queue I/O task; " << mTimeout << "-second time limit expired"); | ||
173 | } | ||
174 | } | ||
175 |
1/2✓ Branch 2 taken 19 times.
✗ Branch 3 not taken.
|
19 | Queue::Notifier notify = std::bind(&Impl::setStatusWithNotification, this, |
176 | 19 | std::placeholders::_1, std::placeholders::_2); | |
177 | task.setNotifier(notify); | ||
178 |
1/2✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
|
19 | this->setStatus(task.id(), Queue::PENDING); |
179 | |||
180 | // get the global task arena | ||
181 | tbb::task_arena arena(tbb::task_arena::attach{}); | ||
182 |
3/8✓ Branch 2 taken 19 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 19 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 19 times.
✗ Branch 9 not taken.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
|
76 | arena.enqueue([task = std::move(task)] { task.execute(); }); |
183 | ++mNumTasks; | ||
184 | 19 | } | |
185 | |||
186 | Index32 mTimeout; | ||
187 | Index32 mCapacity; | ||
188 | std::atomic<Int32> mNumTasks; | ||
189 | Index32 mNextId; | ||
190 | StatusMap mStatus; | ||
191 | NotifierMap mNotifiers; | ||
192 | Index32 mNextNotifierId; | ||
193 | std::mutex mNotifierMutex; | ||
194 | }; | ||
195 | |||
196 | |||
197 | //////////////////////////////////////// | ||
198 | |||
199 | |||
200 | 3 | Queue::Queue(Index32 capacity): mImpl(new Impl) | |
201 | { | ||
202 | 3 | mImpl->mCapacity = capacity; | |
203 | 3 | } | |
204 | |||
205 | |||
206 | 3 | Queue::~Queue() | |
207 | { | ||
208 | // Wait for all queued tasks to complete (successfully or unsuccessfully). | ||
209 | /// @todo Allow the queue to be destroyed while there are uncompleted tasks | ||
210 | /// (e.g., by keeping a static registry of queues that also dispatches | ||
211 | /// or blocks notifications)? | ||
212 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | while (mImpl->mNumTasks > 0) { |
213 | ✗ | std::this_thread::sleep_for(/*0.5s*/std::chrono::milliseconds(500)); | |
214 | } | ||
215 | 3 | } | |
216 | |||
217 | |||
218 | //////////////////////////////////////// | ||
219 | |||
220 | |||
221 | 4 | bool Queue::empty() const { return (mImpl->mNumTasks == 0); } | |
222 | ✗ | Index32 Queue::size() const { return Index32(std::max<Int32>(0, mImpl->mNumTasks)); } | |
223 | ✗ | Index32 Queue::capacity() const { return mImpl->mCapacity; } | |
224 | ✗ | void Queue::setCapacity(Index32 n) { mImpl->mCapacity = std::max<Index32>(1, n); } | |
225 | |||
226 | /// @todo void Queue::setCapacity(Index64 bytes); | ||
227 | |||
228 | /// @todo Provide a way to limit the number of tasks in flight | ||
229 | /// (e.g., by enqueueing tbb::tasks that pop Tasks off a concurrent_queue)? | ||
230 | |||
231 | /// @todo Remove any tasks from the queue that are not currently executing. | ||
232 | //void clear() const; | ||
233 | |||
234 | ✗ | Index32 Queue::timeout() const { return mImpl->mTimeout; } | |
235 | 1 | void Queue::setTimeout(Index32 sec) { mImpl->mTimeout = sec; } | |
236 | |||
237 | |||
238 | //////////////////////////////////////// | ||
239 | |||
240 | |||
241 | Queue::Status | ||
242 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | Queue::status(Id id) const |
243 | { | ||
244 | Impl::StatusMap::const_accessor acc; | ||
245 |
2/4✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
✗ Branch 4 not taken.
|
9 | if (mImpl->mStatus.find(acc, id)) { |
246 | 9 | const Status status = acc->second; | |
247 |
1/2✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
|
9 | if (status == SUCCEEDED || status == FAILED) { |
248 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | mImpl->mStatus.erase(acc); |
249 | } | ||
250 | 9 | return status; | |
251 | } | ||
252 | return UNKNOWN; | ||
253 | } | ||
254 | |||
255 | |||
256 | Queue::Id | ||
257 | 1 | Queue::addNotifier(Notifier notify) | |
258 | { | ||
259 | 1 | std::lock_guard<std::mutex> lock(mImpl->mNotifierMutex); | |
260 | 1 | Queue::Id id = mImpl->mNextNotifierId++; | |
261 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | mImpl->mNotifiers[id] = notify; |
262 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
2 | return id; |
263 | } | ||
264 | |||
265 | |||
266 | void | ||
267 | ✗ | Queue::removeNotifier(Id id) | |
268 | { | ||
269 | ✗ | std::lock_guard<std::mutex> lock(mImpl->mNotifierMutex); | |
270 | ✗ | Impl::NotifierMap::iterator it = mImpl->mNotifiers.find(id); | |
271 | ✗ | if (it != mImpl->mNotifiers.end()) { | |
272 | ✗ | mImpl->mNotifiers.erase(it); | |
273 | } | ||
274 | } | ||
275 | |||
276 | |||
277 | void | ||
278 | ✗ | Queue::clearNotifiers() | |
279 | { | ||
280 | ✗ | std::lock_guard<std::mutex> lock(mImpl->mNotifierMutex); | |
281 | mImpl->mNotifiers.clear(); | ||
282 | } | ||
283 | |||
284 | |||
285 | //////////////////////////////////////// | ||
286 | |||
287 | |||
288 | Queue::Id | ||
289 | ✗ | Queue::writeGrid(GridBase::ConstPtr grid, const Archive& archive, const MetaMap& metadata) | |
290 | { | ||
291 | ✗ | return writeGridVec(GridCPtrVec(1, grid), archive, metadata); | |
292 | } | ||
293 | |||
294 | |||
295 | Queue::Id | ||
296 | 20 | Queue::writeGridVec(const GridCPtrVec& grids, const Archive& archive, const MetaMap& metadata) | |
297 | { | ||
298 | 20 | const Queue::Id taskId = mImpl->mNextId++; | |
299 | 40 | OutputTask task(taskId, grids, archive, metadata); | |
300 |
2/2✓ Branch 1 taken 19 times.
✓ Branch 2 taken 1 times.
|
20 | mImpl->enqueue(task); |
301 | 19 | return taskId; | |
302 | } | ||
303 | |||
304 | } // namespace io | ||
305 | } // namespace OPENVDB_VERSION_NAME | ||
306 | } // namespace openvdb | ||
307 |