OpenVDB  12.0.0
StreamCompression.h
Go to the documentation of this file.
1 // Copyright Contributors to the OpenVDB Project
2 // SPDX-License-Identifier: Apache-2.0
3 
4 /// @file points/StreamCompression.h
5 ///
6 /// @author Dan Bailey
7 ///
8 /// @brief Convenience wrappers to using Blosc and reading and writing of Paged data.
9 ///
10 /// Blosc is most effective with large (> ~256KB) blocks of data. Writing the entire
11 /// data block contiguously would provide the most optimal compression, however would
12 /// limit the ability to use delayed-loading as the whole block would be required to
13 /// be loaded from disk at once. To balance these two competing factors, Paging is used
14 /// to write out blocks of data that are a reasonable size for Blosc. These Pages are
15 /// loaded lazily, tracking the input stream pointers and creating Handles that reference
16 /// portions of the buffer. When the Page buffer is accessed, the data will be read from
17 /// the stream.
18 
19 #ifndef OPENVDB_TOOLS_STREAM_COMPRESSION_HAS_BEEN_INCLUDED
20 #define OPENVDB_TOOLS_STREAM_COMPRESSION_HAS_BEEN_INCLUDED
21 
22 #include <openvdb/io/io.h>
23 #include <openvdb/util/Assert.h>
24 #include <tbb/spin_mutex.h>
25 #include <memory>
26 #include <string>
27 
28 
29 class TestStreamCompression;
30 
31 namespace openvdb {
33 namespace OPENVDB_VERSION_NAME {
34 namespace compression {
35 
36 
37 // This is the minimum number of bytes below which Blosc compression is not used to
38 // avoid unecessary computation, as Blosc offers minimal compression until this limit
39 static const int BLOSC_MINIMUM_BYTES = 48;
40 
41 // This is the minimum number of bytes below which the array is padded with zeros up
42 // to this number of bytes to allow Blosc to perform compression with small arrays
43 static const int BLOSC_PAD_BYTES = 128;
44 
45 
46 /// @brief Returns true if compression is available
48 
49 /// @brief Retrieves the uncompressed size of buffer when uncompressed
50 ///
51 /// @param buffer the compressed buffer
52 OPENVDB_API size_t bloscUncompressedSize(const char* buffer);
53 
54 /// @brief Compress into the supplied buffer.
55 ///
56 /// @param compressedBuffer the buffer to compress
57 /// @param compressedBytes number of compressed bytes
58 /// @param bufferBytes the number of bytes in compressedBuffer available to be filled
59 /// @param uncompressedBuffer the uncompressed buffer to compress
60 /// @param uncompressedBytes number of uncompressed bytes
61 OPENVDB_API void bloscCompress(char* compressedBuffer, size_t& compressedBytes,
62  const size_t bufferBytes, const char* uncompressedBuffer, const size_t uncompressedBytes);
63 
64 /// @brief Compress and return the heap-allocated compressed buffer.
65 ///
66 /// @param buffer the buffer to compress
67 /// @param uncompressedBytes number of uncompressed bytes
68 /// @param compressedBytes number of compressed bytes (written to this variable)
69 /// @param resize the compressed buffer will be exactly resized to remove the
70 /// portion used for Blosc overhead, for efficiency this can be
71 /// skipped if it is known that the resulting buffer is temporary
72 OPENVDB_API std::unique_ptr<char[]> bloscCompress(const char* buffer,
73  const size_t uncompressedBytes, size_t& compressedBytes, const bool resize = true);
74 
75 /// @brief Convenience wrapper to retrieve the compressed size of buffer when compressed
76 ///
77 /// @param buffer the uncompressed buffer
78 /// @param uncompressedBytes number of uncompressed bytes
79 OPENVDB_API size_t bloscCompressedSize(const char* buffer, const size_t uncompressedBytes);
80 
81 /// @brief Decompress into the supplied buffer. Will throw if decompression fails or
82 /// uncompressed buffer has insufficient space in which to decompress.
83 ///
84 /// @param uncompressedBuffer the uncompressed buffer to decompress into
85 /// @param expectedBytes the number of bytes expected once the buffer is decompressed
86 /// @param bufferBytes the number of bytes in uncompressedBuffer available to be filled
87 /// @param compressedBuffer the compressed buffer to decompress
88 OPENVDB_API void bloscDecompress(char* uncompressedBuffer, const size_t expectedBytes,
89  const size_t bufferBytes, const char* compressedBuffer);
90 
91 /// @brief Decompress and return the the heap-allocated uncompressed buffer.
92 ///
93 /// @param buffer the buffer to decompress
94 /// @param expectedBytes the number of bytes expected once the buffer is decompressed
95 /// @param resize the compressed buffer will be exactly resized to remove the
96 /// portion used for Blosc overhead, for efficiency this can be
97 /// skipped if it is known that the resulting buffer is temporary
98 OPENVDB_API std::unique_ptr<char[]> bloscDecompress(const char* buffer,
99  const size_t expectedBytes, const bool resize = true);
100 
101 
102 ////////////////////////////////////////
103 
104 
105 // 1MB = 1048576 Bytes
106 static const int PageSize = 1024 * 1024;
107 
108 
109 /// @brief Stores a variable-size, compressed, delayed-load Page of data
110 /// that is loaded into memory when accessed. Access to the Page is
111 /// thread-safe as loading and decompressing the data is protected by a mutex.
113 {
114 private:
115  struct Info
116  {
117 #ifdef OPENVDB_USE_DELAYED_LOADING
118  io::MappedFile::Ptr mappedFile;
119 #endif
121  std::streamoff filepos;
122  long compressedBytes;
123  long uncompressedBytes;
124  }; // Info
125 
126 public:
127  using Ptr = std::shared_ptr<Page>;
128 
129  Page() = default;
130 
131  /// @brief load the Page into memory
132  void load() const;
133 
134  /// @brief Uncompressed bytes of the Paged data, available
135  /// when the header has been read.
136  long uncompressedBytes() const;
137 
138  /// @brief Retrieves a data pointer at the specific @param index
139  /// @note Will force a Page load when called.
140  const char* buffer(const int index) const;
141 
142  /// @brief Read the Page header
143  void readHeader(std::istream&);
144 
145  /// @brief Read the Page buffers. If @a delayed is true, stream
146  /// pointers will be stored to load the data lazily.
147  void readBuffers(std::istream&, bool delayed);
148 
149  /// @brief Test if the data is out-of-core
150  bool isOutOfCore() const;
151 
152 private:
153  /// @brief Convenience method to store a copy of the supplied buffer
154  void copy(const std::unique_ptr<char[]>& temp, int pageSize);
155 
156  /// @brief Decompress and store the supplied data
157  void decompress(const std::unique_ptr<char[]>& temp);
158 
159  /// @brief Thread-safe loading of the data
160  void doLoad() const;
161 
162  std::unique_ptr<Info> mInfo = std::unique_ptr<Info>(new Info);
163  std::unique_ptr<char[]> mData;
164  tbb::spin_mutex mMutex;
165 }; // class Page
166 
167 
168 /// @brief A PageHandle holds a unique ptr to a Page and a specific stream
169 /// pointer to a point within the decompressed Page buffer
171 {
172 public:
173  using Ptr = std::unique_ptr<PageHandle>;
174 
175  /// @brief Create the page handle
176  /// @param page a shared ptr to the page that stores the buffer
177  /// @param index start position of the buffer to be read
178  /// @param size total size of the buffer to be read in bytes
179  PageHandle(const Page::Ptr& page, const int index, const int size);
180 
181  /// @brief Retrieve a reference to the stored page
182  Page& page();
183 
184  /// @brief Return the size of the buffer
185  int size() const { return mSize; }
186 
187  /// @brief Read and return the buffer, loading and decompressing
188  /// the Page if necessary.
189  std::unique_ptr<char[]> read();
190 
191  /// @brief Return a copy of this PageHandle
192  Ptr copy() { return Ptr(new PageHandle(mPage, mIndex, mSize)); }
193 
194 protected:
195  friend class ::TestStreamCompression;
196 
197 private:
198  Page::Ptr mPage;
199  int mIndex = -1;
200  int mSize = 0;
201 }; // class PageHandle
202 
203 
204 /// @brief A Paging wrapper to std::istream that is responsible for reading
205 /// from a given input stream and creating Page objects and PageHandles that
206 /// reference those pages for delayed reading.
208 {
209 public:
210  using Ptr = std::shared_ptr<PagedInputStream>;
211 
212  PagedInputStream() = default;
213 
214  explicit PagedInputStream(std::istream& is);
215 
216  /// @brief Size-only mode tags the stream as only reading size data.
217  void setSizeOnly(bool sizeOnly) { mSizeOnly = sizeOnly; }
218  bool sizeOnly() const { return mSizeOnly; }
219 
220  // @brief Set and get the input stream
221  std::istream& getInputStream() { OPENVDB_ASSERT(mIs); return *mIs; }
222  void setInputStream(std::istream& is) { mIs = &is; }
223 
224  /// @brief Creates a PageHandle to access the next @param n bytes of the Page.
225  PageHandle::Ptr createHandle(std::streamsize n);
226 
227  /// @brief Takes a @a pageHandle and updates the referenced page with the
228  /// current stream pointer position and if @a delayed is false performs
229  /// an immediate read of the data.
230  void read(PageHandle::Ptr& pageHandle, std::streamsize n, bool delayed = true);
231 
232 private:
233  int mByteIndex = 0;
234  int mUncompressedBytes = 0;
235  std::istream* mIs = nullptr;
236  Page::Ptr mPage;
237  bool mSizeOnly = false;
238 }; // class PagedInputStream
239 
240 
241 /// @brief A Paging wrapper to std::ostream that is responsible for writing
242 /// from a given output stream at intervals set by the PageSize. As Pages are
243 /// variable in size, they are flushed to disk as soon as sufficiently large.
245 {
246 public:
247  using Ptr = std::shared_ptr<PagedOutputStream>;
248 
250 
251  explicit PagedOutputStream(std::ostream& os);
252 
253  /// @brief Size-only mode tags the stream as only writing size data.
254  void setSizeOnly(bool sizeOnly) { mSizeOnly = sizeOnly; }
255  bool sizeOnly() const { return mSizeOnly; }
256 
257  /// @brief Set and get the output stream
258  std::ostream& getOutputStream() { OPENVDB_ASSERT(mOs); return *mOs; }
259  void setOutputStream(std::ostream& os) { mOs = &os; }
260 
261  /// @brief Writes the given @param str buffer of size @param n
262  PagedOutputStream& write(const char* str, std::streamsize n);
263 
264  /// @brief Manually flushes the current page to disk if non-zero
265  void flush();
266 
267 private:
268  /// @brief Compress the @param buffer of @param size bytes and write
269  /// out to the stream.
270  void compressAndWrite(const char* buffer, size_t size);
271 
272  /// @brief Resize the internal page buffer to @param size bytes
273  void resize(size_t size);
274 
275  std::unique_ptr<char[]> mData = std::unique_ptr<char[]>(new char[PageSize]);
276  std::unique_ptr<char[]> mCompressedData = nullptr;
277  size_t mCapacity = PageSize;
278  int mBytes = 0;
279  std::ostream* mOs = nullptr;
280  bool mSizeOnly = false;
281 }; // class PagedOutputStream
282 
283 
284 } // namespace compression
285 } // namespace OPENVDB_VERSION_NAME
286 } // namespace openvdb
287 
288 #endif // OPENVDB_TOOLS_STREAM_COMPRESSION_HAS_BEEN_INCLUDED
#define OPENVDB_API
Definition: Platform.h:268
OPENVDB_API size_t bloscUncompressedSize(const char *buffer)
Retrieves the uncompressed size of buffer when uncompressed.
std::istream & getInputStream()
Definition: StreamCompression.h:221
void setSizeOnly(bool sizeOnly)
Size-only mode tags the stream as only reading size data.
Definition: StreamCompression.h:217
OPENVDB_API std::unique_ptr< char[]> bloscCompress(const char *buffer, const size_t uncompressedBytes, size_t &compressedBytes, const bool resize=true)
Compress and return the heap-allocated compressed buffer.
Stores a variable-size, compressed, delayed-load Page of data that is loaded into memory when accesse...
Definition: StreamCompression.h:112
bool sizeOnly() const
Definition: StreamCompression.h:218
std::shared_ptr< PagedInputStream > Ptr
Definition: StreamCompression.h:210
A Paging wrapper to std::ostream that is responsible for writing from a given output stream at interv...
Definition: StreamCompression.h:244
bool sizeOnly() const
Definition: StreamCompression.h:255
std::shared_ptr< PagedOutputStream > Ptr
Definition: StreamCompression.h:247
std::ostream & getOutputStream()
Set and get the output stream.
Definition: StreamCompression.h:258
std::shared_ptr< T > SharedPtr
Definition: Types.h:114
#define OPENVDB_ASSERT(X)
Definition: Assert.h:41
static const int PageSize
Definition: StreamCompression.h:106
void setInputStream(std::istream &is)
Definition: StreamCompression.h:222
Definition: Exceptions.h:13
static const int BLOSC_MINIMUM_BYTES
Definition: StreamCompression.h:39
std::shared_ptr< Page > Ptr
Definition: StreamCompression.h:127
int size() const
Return the size of the buffer.
Definition: StreamCompression.h:185
std::unique_ptr< PageHandle > Ptr
Definition: StreamCompression.h:173
static const int BLOSC_PAD_BYTES
Definition: StreamCompression.h:43
A PageHandle holds a unique ptr to a Page and a specific stream pointer to a point within the decompr...
Definition: StreamCompression.h:170
void setOutputStream(std::ostream &os)
Definition: StreamCompression.h:259
OPENVDB_API std::unique_ptr< char[]> bloscDecompress(const char *buffer, const size_t expectedBytes, const bool resize=true)
Decompress and return the the heap-allocated uncompressed buffer.
OPENVDB_API bool bloscCanCompress()
Returns true if compression is available.
void setSizeOnly(bool sizeOnly)
Size-only mode tags the stream as only writing size data.
Definition: StreamCompression.h:254
A Paging wrapper to std::istream that is responsible for reading from a given input stream and creati...
Definition: StreamCompression.h:207
#define OPENVDB_VERSION_NAME
The version namespace name for this library version.
Definition: version.h.in:121
OPENVDB_API size_t bloscCompressedSize(const char *buffer, const size_t uncompressedBytes)
Convenience wrapper to retrieve the compressed size of buffer when compressed.
#define OPENVDB_USE_VERSION_NAMESPACE
Definition: version.h.in:218
Ptr copy()
Return a copy of this PageHandle.
Definition: StreamCompression.h:192