1 /*************************************************************************
3 * Copyright 2016 Realm Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 **************************************************************************/
19 #ifndef REALM_IMPL_INPUT_STREAM_HPP
20 #define REALM_IMPL_INPUT_STREAM_HPP
24 #include <realm/binary_data.hpp>
25 #include <realm/impl/cont_transact_hist.hpp>
26 #include <realm/util/buffer.hpp>
35 /// Read bytes from this input stream and place them in the specified
36 /// buffer. The returned value is the actual number of bytes that were read,
37 /// and this is some number `n` such that `n <= min(size, m)` where `m` is
38 /// the number of bytes that could have been read from this stream before
39 /// reaching its end. Also, `n` cannot be zero unless `m` or `size` is
40 /// zero. The intention is that `size` should be non-zero, a the return
41 /// value used as the end-of-input indicator.
43 /// Implementations are only allowed to block (put the calling thread to
44 /// sleep) up until the point in time where the first byte can be made
46 virtual size_t read(char* buffer, size_t size) = 0;
48 virtual ~InputStream() noexcept
54 class SimpleInputStream : public InputStream {
56 SimpleInputStream(const char* data, size_t size) noexcept
61 size_t read(char* buffer, size_t size) override
63 size_t n = std::min(size, size_t(m_end - m_ptr));
64 const char* begin = m_ptr;
66 realm::safe_copy_n(begin, n, buffer);
72 const char* const m_end;
76 class NoCopyInputStream {
78 /// \return if any bytes was read.
79 /// A value of false indicates end-of-input.
80 /// If return value is true, \a begin and \a end are
81 /// updated to reflect the start and limit of a
82 /// contiguous memory chunk.
83 virtual bool next_block(const char*& begin, const char*& end) = 0;
85 virtual ~NoCopyInputStream() noexcept
91 class NoCopyInputStreamAdaptor : public NoCopyInputStream {
93 NoCopyInputStreamAdaptor(InputStream& in, char* buffer, size_t buffer_size) noexcept
96 , m_buffer_size(buffer_size)
99 bool next_block(const char*& begin, const char*& end) override
101 size_t n = m_in.read(m_buffer, m_buffer_size);
110 size_t m_buffer_size;
114 class SimpleNoCopyInputStream : public NoCopyInputStream {
116 SimpleNoCopyInputStream(const char* data, size_t size)
122 bool next_block(const char*& begin, const char*& end) override
126 size_t size = m_size;
138 class MultiLogNoCopyInputStream : public NoCopyInputStream {
140 MultiLogNoCopyInputStream(const BinaryData* logs_begin, const BinaryData* logs_end)
141 : m_logs_begin(logs_begin)
142 , m_logs_end(logs_end)
144 if (m_logs_begin != m_logs_end)
145 m_curr_buf_remaining_size = m_logs_begin->size();
148 size_t read(char* buffer, size_t size)
150 if (m_logs_begin == m_logs_end)
153 if (m_curr_buf_remaining_size > 0) {
154 size_t offset = m_logs_begin->size() - m_curr_buf_remaining_size;
155 const char* data = m_logs_begin->data() + offset;
156 size_t size_2 = std::min(m_curr_buf_remaining_size, size);
157 m_curr_buf_remaining_size -= size_2;
158 // FIXME: Eliminate the need for copying by changing the API of
159 // Replication::InputStream such that blocks can be handed over
160 // without copying. This is a straight forward change, but the
161 // result is going to be more complicated and less conventional.
162 realm::safe_copy_n(data, size_2, buffer);
167 if (m_logs_begin == m_logs_end)
169 m_curr_buf_remaining_size = m_logs_begin->size();
173 bool next_block(const char*& begin, const char*& end) override
175 while (m_logs_begin < m_logs_end) {
176 size_t result = m_logs_begin->size();
177 const char* data = m_logs_begin->data();
180 continue; // skip empty blocks
189 const BinaryData* m_logs_begin;
190 const BinaryData* m_logs_end;
191 size_t m_curr_buf_remaining_size;
195 class ChangesetInputStream : public NoCopyInputStream {
197 using version_type = History::version_type;
198 static constexpr unsigned NB_BUFFERS = 8;
200 ChangesetInputStream(History& hist, version_type begin_version, version_type end_version)
202 , m_begin_version(begin_version)
203 , m_end_version(end_version)
208 bool next_block(const char*& begin, const char*& end) override
211 BinaryData actual = m_changesets_begin->get_next();
213 if (actual.size() > 0) {
214 begin = actual.data();
215 end = actual.data() + actual.size();
219 m_changesets_begin++;
221 if (REALM_UNLIKELY(m_changesets_begin == m_changesets_end)) {
225 return false; // End of input
230 version_type m_begin_version, m_end_version;
231 BinaryIterator m_changesets[NB_BUFFERS]; // Buffer
232 BinaryIterator* m_changesets_begin = nullptr;
233 BinaryIterator* m_changesets_end = nullptr;
238 auto versions_to_get = m_end_version - m_begin_version;
239 m_valid = versions_to_get > 0;
241 if (versions_to_get > NB_BUFFERS)
242 versions_to_get = NB_BUFFERS;
243 version_type end_version = m_begin_version + versions_to_get;
244 m_history.get_changesets(m_begin_version, end_version, m_changesets);
245 m_begin_version = end_version;
246 m_changesets_begin = m_changesets;
247 m_changesets_end = m_changesets_begin + versions_to_get;
255 #endif // REALM_IMPL_INPUT_STREAM_HPP