1 ////////////////////////////////////////////////////////////////////////////
3 // Copyright 2016 Realm Inc.
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
17 ////////////////////////////////////////////////////////////////////////////
19 #include "impl/collection_notifier.hpp"
21 #include "impl/realm_coordinator.hpp"
22 #include "shared_realm.hpp"
24 #include <realm/group_shared.hpp>
25 #include <realm/link_view.hpp>
27 using namespace realm;
28 using namespace realm::_impl;
30 std::function<bool (size_t)>
31 CollectionNotifier::get_modification_checker(TransactionChangeInfo const& info,
32 Table const& root_table)
34 if (info.schema_changed)
35 set_table(root_table);
37 // First check if any of the tables accessible from the root table were
38 // actually modified. This can be false if there were only insertions, or
39 // deletions which were not linked to by any row in the linking table
40 auto table_modified = [&](auto& tbl) {
41 return tbl.table_ndx < info.tables.size()
42 && !info.tables[tbl.table_ndx].modifications.empty();
44 if (!any_of(begin(m_related_tables), end(m_related_tables), table_modified)) {
45 return [](size_t) { return false; };
47 if (m_related_tables.size() == 1) {
48 auto& modifications = info.tables[m_related_tables[0].table_ndx].modifications;
49 return [&](size_t row) { return modifications.contains(row); };
52 return DeepChangeChecker(info, root_table, m_related_tables);
55 void DeepChangeChecker::find_related_tables(std::vector<RelatedTable>& out, Table const& table)
57 auto table_ndx = table.get_index_in_group();
58 if (table_ndx == npos)
60 if (any_of(begin(out), end(out), [=](auto& tbl) { return tbl.table_ndx == table_ndx; }))
63 // We need to add this table to `out` before recurring so that the check
64 // above works, but we can't store a pointer to the thing being populated
65 // because the recursive calls may resize `out`, so instead look it up by
67 size_t out_index = out.size();
68 out.push_back({table_ndx, {}});
70 for (size_t i = 0, count = table.get_column_count(); i != count; ++i) {
71 auto type = table.get_column_type(i);
72 if (type == type_Link || type == type_LinkList) {
73 out[out_index].links.push_back({i, type == type_LinkList});
74 find_related_tables(out, *table.get_link_target(i));
79 DeepChangeChecker::DeepChangeChecker(TransactionChangeInfo const& info,
80 Table const& root_table,
81 std::vector<RelatedTable> const& related_tables)
83 , m_root_table(root_table)
84 , m_root_table_ndx(root_table.get_index_in_group())
85 , m_root_modifications(m_root_table_ndx < info.tables.size() ? &info.tables[m_root_table_ndx].modifications : nullptr)
86 , m_related_tables(related_tables)
90 bool DeepChangeChecker::check_outgoing_links(size_t table_ndx,
92 size_t row_ndx, size_t depth)
94 auto it = find_if(begin(m_related_tables), end(m_related_tables),
95 [&](auto&& tbl) { return tbl.table_ndx == table_ndx; });
96 if (it == m_related_tables.end())
99 // Check if we're already checking if the destination of the link is
100 // modified, and if not add it to the stack
101 auto already_checking = [&](size_t col) {
102 auto end = m_current_path.begin() + depth;
103 auto match = std::find_if(m_current_path.begin(), end, [&](auto& p) {
104 return p.table == table_ndx && p.row == row_ndx && p.col == col;
107 for (; match < end; ++match) match->depth_exceeded = true;
110 m_current_path[depth] = {table_ndx, row_ndx, col, false};
114 auto linked_object_changed = [&](OutgoingLink const& link) {
115 if (already_checking(link.col_ndx))
118 if (table.is_null_link(link.col_ndx, row_ndx))
120 auto dst = table.get_link(link.col_ndx, row_ndx);
121 return check_row(*table.get_link_target(link.col_ndx), dst, depth + 1);
124 auto& target = *table.get_link_target(link.col_ndx);
125 auto lvr = table.get_linklist(link.col_ndx, row_ndx);
126 for (size_t j = 0, size = lvr->size(); j < size; ++j) {
127 size_t dst = lvr->get(j).get_index();
128 if (check_row(target, dst, depth + 1))
134 return std::any_of(begin(it->links), end(it->links), linked_object_changed);
137 bool DeepChangeChecker::check_row(Table const& table, size_t idx, size_t depth)
139 // Arbitrary upper limit on the maximum depth to search
140 if (depth >= m_current_path.size()) {
141 // Don't mark any of the intermediate rows checked along the path as
142 // not modified, as a search starting from them might hit a modification
143 for (size_t i = 0; i < m_current_path.size(); ++i)
144 m_current_path[i].depth_exceeded = true;
148 size_t table_ndx = table.get_index_in_group();
149 if (depth > 0 && table_ndx < m_info.tables.size() && m_info.tables[table_ndx].modifications.contains(idx))
152 if (m_not_modified.size() <= table_ndx)
153 m_not_modified.resize(table_ndx + 1);
154 if (m_not_modified[table_ndx].contains(idx))
157 bool ret = check_outgoing_links(table_ndx, table, idx, depth);
158 if (!ret && (depth == 0 || !m_current_path[depth - 1].depth_exceeded))
159 m_not_modified[table_ndx].add(idx);
163 bool DeepChangeChecker::operator()(size_t ndx)
165 if (m_root_modifications && m_root_modifications->contains(ndx))
167 return check_row(m_root_table, ndx, 0);
170 CollectionNotifier::CollectionNotifier(std::shared_ptr<Realm> realm)
171 : m_realm(std::move(realm))
172 , m_sg_version(Realm::Internal::get_shared_group(*m_realm)->get_version_of_current_transaction())
176 CollectionNotifier::~CollectionNotifier()
178 // Need to do this explicitly to ensure m_realm is destroyed with the mutex
179 // held to avoid potential double-deletion
183 uint64_t CollectionNotifier::add_callback(CollectionChangeCallback callback)
185 m_realm->verify_thread();
187 std::lock_guard<std::mutex> lock(m_callback_mutex);
188 auto token = m_next_token++;
189 m_callbacks.push_back({std::move(callback), {}, {}, token, false, false});
190 if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications
191 Realm::Internal::get_coordinator(*m_realm).wake_up_notifier_worker();
192 m_have_callbacks = true;
197 void CollectionNotifier::remove_callback(uint64_t token)
199 // the callback needs to be destroyed after releasing the lock as destroying
200 // it could cause user code to be called
203 std::lock_guard<std::mutex> lock(m_callback_mutex);
204 auto it = find_callback(token);
205 if (it == end(m_callbacks)) {
209 size_t idx = distance(begin(m_callbacks), it);
210 if (m_callback_index != npos) {
211 if (m_callback_index >= idx)
216 old = std::move(*it);
217 m_callbacks.erase(it);
219 m_have_callbacks = !m_callbacks.empty();
223 void CollectionNotifier::suppress_next_notification(uint64_t token)
226 std::lock_guard<std::mutex> lock(m_realm_mutex);
227 REALM_ASSERT(m_realm);
228 m_realm->verify_thread();
229 m_realm->verify_in_write();
232 std::lock_guard<std::mutex> lock(m_callback_mutex);
233 auto it = find_callback(token);
234 if (it != end(m_callbacks)) {
235 it->skip_next = true;
239 std::vector<CollectionNotifier::Callback>::iterator CollectionNotifier::find_callback(uint64_t token)
241 REALM_ASSERT(m_error || m_callbacks.size() > 0);
243 auto it = find_if(begin(m_callbacks), end(m_callbacks),
244 [=](const auto& c) { return c.token == token; });
245 // We should only fail to find the callback if it was removed due to an error
246 REALM_ASSERT(m_error || it != end(m_callbacks));
250 void CollectionNotifier::unregister() noexcept
252 std::lock_guard<std::mutex> lock(m_realm_mutex);
256 bool CollectionNotifier::is_alive() const noexcept
258 std::lock_guard<std::mutex> lock(m_realm_mutex);
259 return m_realm != nullptr;
262 std::unique_lock<std::mutex> CollectionNotifier::lock_target()
264 return std::unique_lock<std::mutex>{m_realm_mutex};
267 void CollectionNotifier::set_table(Table const& table)
269 m_related_tables.clear();
270 DeepChangeChecker::find_related_tables(m_related_tables, table);
273 void CollectionNotifier::add_required_change_info(TransactionChangeInfo& info)
275 if (!do_add_required_change_info(info) || m_related_tables.empty()) {
279 auto max = max_element(begin(m_related_tables), end(m_related_tables),
280 [](auto&& a, auto&& b) { return a.table_ndx < b.table_ndx; });
282 if (max->table_ndx >= info.table_modifications_needed.size())
283 info.table_modifications_needed.resize(max->table_ndx + 1, false);
284 for (auto& tbl : m_related_tables) {
285 info.table_modifications_needed[tbl.table_ndx] = true;
289 void CollectionNotifier::prepare_handover()
292 m_sg_version = m_sg->get_version_of_current_transaction();
293 do_prepare_handover(*m_sg);
297 std::lock_guard<std::mutex> lock(m_callback_mutex);
298 for (auto& callback : m_callbacks)
299 REALM_ASSERT(!callback.skip_next);
303 void CollectionNotifier::before_advance()
305 for_each_callback([&](auto& lock, auto& callback) {
306 if (callback.changes_to_deliver.empty()) {
310 auto changes = callback.changes_to_deliver;
311 // acquire a local reference to the callback so that removing the
312 // callback from within it can't result in a dangling pointer
313 auto cb = callback.fn;
319 void CollectionNotifier::after_advance()
321 for_each_callback([&](auto& lock, auto& callback) {
322 if (callback.initial_delivered && callback.changes_to_deliver.empty()) {
325 callback.initial_delivered = true;
327 auto changes = std::move(callback.changes_to_deliver);
328 // acquire a local reference to the callback so that removing the
329 // callback from within it can't result in a dangling pointer
330 auto cb = callback.fn;
336 void CollectionNotifier::deliver_error(std::exception_ptr error)
338 // Don't complain about double-unregistering callbacks
341 m_callback_count = m_callbacks.size();
342 for_each_callback([this, &error](auto& lock, auto& callback) {
343 // acquire a local reference to the callback so that removing the
344 // callback from within it can't result in a dangling pointer
345 auto cb = std::move(callback.fn);
346 auto token = callback.token;
350 // We never want to call the callback again after this, so just remove it
351 this->remove_callback(token);
355 bool CollectionNotifier::is_for_realm(Realm& realm) const noexcept
357 std::lock_guard<std::mutex> lock(m_realm_mutex);
358 return m_realm.get() == &realm;
361 bool CollectionNotifier::package_for_delivery()
363 if (!prepare_to_deliver())
365 std::lock_guard<std::mutex> l(m_callback_mutex);
366 for (auto& callback : m_callbacks)
367 callback.changes_to_deliver = std::move(callback.accumulated_changes).finalize();
368 m_callback_count = m_callbacks.size();
372 template<typename Fn>
373 void CollectionNotifier::for_each_callback(Fn&& fn)
375 std::unique_lock<std::mutex> callback_lock(m_callback_mutex);
376 REALM_ASSERT_DEBUG(m_callback_count <= m_callbacks.size());
377 for (++m_callback_index; m_callback_index < m_callback_count; ++m_callback_index) {
378 fn(callback_lock, m_callbacks[m_callback_index]);
379 if (!callback_lock.owns_lock())
380 callback_lock.lock();
383 m_callback_index = npos;
386 void CollectionNotifier::attach_to(SharedGroup& sg)
394 void CollectionNotifier::detach()
397 do_detach_from(*m_sg);
401 SharedGroup& CollectionNotifier::source_shared_group()
403 return *Realm::Internal::get_shared_group(*m_realm);
406 void CollectionNotifier::add_changes(CollectionChangeBuilder change)
408 std::lock_guard<std::mutex> lock(m_callback_mutex);
409 for (auto& callback : m_callbacks) {
410 if (callback.skip_next) {
411 REALM_ASSERT_DEBUG(callback.accumulated_changes.empty());
412 callback.skip_next = false;
415 if (&callback == &m_callbacks.back())
416 callback.accumulated_changes.merge(std::move(change));
418 callback.accumulated_changes.merge(CollectionChangeBuilder(change));
423 NotifierPackage::NotifierPackage(std::exception_ptr error,
424 std::vector<std::shared_ptr<CollectionNotifier>> notifiers,
425 RealmCoordinator* coordinator)
426 : m_notifiers(std::move(notifiers))
427 , m_coordinator(coordinator)
428 , m_error(std::move(error))
432 void NotifierPackage::package_and_wait(util::Optional<VersionID::version_type> target_version)
434 if (!m_coordinator || m_error || !*this)
437 auto lock = m_coordinator->wait_for_notifiers([&] {
440 return std::all_of(begin(m_notifiers), end(m_notifiers), [&](auto const& n) {
441 return !n->have_callbacks() || (n->has_run() && n->version().version >= *target_version);
445 // Package the notifiers for delivery and remove any which don't have anything to deliver
446 auto package = [&](auto& notifier) {
447 if (notifier->has_run() && notifier->package_for_delivery()) {
448 m_version = notifier->version();
453 m_notifiers.erase(std::remove_if(begin(m_notifiers), end(m_notifiers), package), end(m_notifiers));
454 if (m_version && target_version && m_version->version < *target_version) {
456 m_version = util::none;
458 REALM_ASSERT(m_version || m_notifiers.empty());
460 m_coordinator = nullptr;
463 void NotifierPackage::before_advance()
467 for (auto& notifier : m_notifiers)
468 notifier->before_advance();
471 void NotifierPackage::deliver(SharedGroup& sg)
474 for (auto& notifier : m_notifiers)
475 notifier->deliver_error(m_error);
478 // Can't deliver while in a write transaction
479 if (sg.get_transact_stage() != SharedGroup::transact_Reading)
481 for (auto& notifier : m_notifiers)
482 notifier->deliver(sg);
485 void NotifierPackage::after_advance()
489 for (auto& notifier : m_notifiers)
490 notifier->after_advance();
493 void NotifierPackage::add_notifier(std::shared_ptr<CollectionNotifier> notifier)
495 m_notifiers.push_back(notifier);
496 m_coordinator->register_notifier(notifier);