X-Git-Url: https://git.mdrn.pl/wl-app.git/blobdiff_plain/53b27422d140022594fc241cca91c3183be57bca..48b2fe9f7c2dc3d9aeaaa6dbfb27c7da4f3235ff:/iOS/Pods/Realm/Realm/ObjectStore/src/impl/collection_notifier.cpp diff --git a/iOS/Pods/Realm/Realm/ObjectStore/src/impl/collection_notifier.cpp b/iOS/Pods/Realm/Realm/ObjectStore/src/impl/collection_notifier.cpp new file mode 100644 index 0000000..8fa94d3 --- /dev/null +++ b/iOS/Pods/Realm/Realm/ObjectStore/src/impl/collection_notifier.cpp @@ -0,0 +1,497 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2016 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include "impl/collection_notifier.hpp" + +#include "impl/realm_coordinator.hpp" +#include "shared_realm.hpp" + +#include +#include + +using namespace realm; +using namespace realm::_impl; + +std::function +CollectionNotifier::get_modification_checker(TransactionChangeInfo const& info, + Table const& root_table) +{ + if (info.schema_changed) + set_table(root_table); + + // First check if any of the tables accessible from the root table were + // actually modified. This can be false if there were only insertions, or + // deletions which were not linked to by any row in the linking table + auto table_modified = [&](auto& tbl) { + return tbl.table_ndx < info.tables.size() + && !info.tables[tbl.table_ndx].modifications.empty(); + }; + if (!any_of(begin(m_related_tables), end(m_related_tables), table_modified)) { + return [](size_t) { return false; }; + } + if (m_related_tables.size() == 1) { + auto& modifications = info.tables[m_related_tables[0].table_ndx].modifications; + return [&](size_t row) { return modifications.contains(row); }; + } + + return DeepChangeChecker(info, root_table, m_related_tables); +} + +void DeepChangeChecker::find_related_tables(std::vector& out, Table const& table) +{ + auto table_ndx = table.get_index_in_group(); + if (table_ndx == npos) + return; + if (any_of(begin(out), end(out), [=](auto& tbl) { return tbl.table_ndx == table_ndx; })) + return; + + // We need to add this table to `out` before recurring so that the check + // above works, but we can't store a pointer to the thing being populated + // because the recursive calls may resize `out`, so instead look it up by + // index every time + size_t out_index = out.size(); + out.push_back({table_ndx, {}}); + + for (size_t i = 0, count = table.get_column_count(); i != count; ++i) { + auto type = table.get_column_type(i); + if (type == type_Link || type == type_LinkList) { + out[out_index].links.push_back({i, type == type_LinkList}); + find_related_tables(out, *table.get_link_target(i)); + } + } +} + +DeepChangeChecker::DeepChangeChecker(TransactionChangeInfo const& info, + Table const& root_table, + std::vector const& related_tables) +: m_info(info) +, m_root_table(root_table) +, m_root_table_ndx(root_table.get_index_in_group()) +, m_root_modifications(m_root_table_ndx < info.tables.size() ? &info.tables[m_root_table_ndx].modifications : nullptr) +, m_related_tables(related_tables) +{ +} + +bool DeepChangeChecker::check_outgoing_links(size_t table_ndx, + Table const& table, + size_t row_ndx, size_t depth) +{ + auto it = find_if(begin(m_related_tables), end(m_related_tables), + [&](auto&& tbl) { return tbl.table_ndx == table_ndx; }); + if (it == m_related_tables.end()) + return false; + + // Check if we're already checking if the destination of the link is + // modified, and if not add it to the stack + auto already_checking = [&](size_t col) { + auto end = m_current_path.begin() + depth; + auto match = std::find_if(m_current_path.begin(), end, [&](auto& p) { + return p.table == table_ndx && p.row == row_ndx && p.col == col; + }); + if (match != end) { + for (; match < end; ++match) match->depth_exceeded = true; + return true; + } + m_current_path[depth] = {table_ndx, row_ndx, col, false}; + return false; + }; + + auto linked_object_changed = [&](OutgoingLink const& link) { + if (already_checking(link.col_ndx)) + return false; + if (!link.is_list) { + if (table.is_null_link(link.col_ndx, row_ndx)) + return false; + auto dst = table.get_link(link.col_ndx, row_ndx); + return check_row(*table.get_link_target(link.col_ndx), dst, depth + 1); + } + + auto& target = *table.get_link_target(link.col_ndx); + auto lvr = table.get_linklist(link.col_ndx, row_ndx); + for (size_t j = 0, size = lvr->size(); j < size; ++j) { + size_t dst = lvr->get(j).get_index(); + if (check_row(target, dst, depth + 1)) + return true; + } + return false; + }; + + return std::any_of(begin(it->links), end(it->links), linked_object_changed); +} + +bool DeepChangeChecker::check_row(Table const& table, size_t idx, size_t depth) +{ + // Arbitrary upper limit on the maximum depth to search + if (depth >= m_current_path.size()) { + // Don't mark any of the intermediate rows checked along the path as + // not modified, as a search starting from them might hit a modification + for (size_t i = 0; i < m_current_path.size(); ++i) + m_current_path[i].depth_exceeded = true; + return false; + } + + size_t table_ndx = table.get_index_in_group(); + if (depth > 0 && table_ndx < m_info.tables.size() && m_info.tables[table_ndx].modifications.contains(idx)) + return true; + + if (m_not_modified.size() <= table_ndx) + m_not_modified.resize(table_ndx + 1); + if (m_not_modified[table_ndx].contains(idx)) + return false; + + bool ret = check_outgoing_links(table_ndx, table, idx, depth); + if (!ret && (depth == 0 || !m_current_path[depth - 1].depth_exceeded)) + m_not_modified[table_ndx].add(idx); + return ret; +} + +bool DeepChangeChecker::operator()(size_t ndx) +{ + if (m_root_modifications && m_root_modifications->contains(ndx)) + return true; + return check_row(m_root_table, ndx, 0); +} + +CollectionNotifier::CollectionNotifier(std::shared_ptr realm) +: m_realm(std::move(realm)) +, m_sg_version(Realm::Internal::get_shared_group(*m_realm)->get_version_of_current_transaction()) +{ +} + +CollectionNotifier::~CollectionNotifier() +{ + // Need to do this explicitly to ensure m_realm is destroyed with the mutex + // held to avoid potential double-deletion + unregister(); +} + +uint64_t CollectionNotifier::add_callback(CollectionChangeCallback callback) +{ + m_realm->verify_thread(); + + std::lock_guard lock(m_callback_mutex); + auto token = m_next_token++; + m_callbacks.push_back({std::move(callback), {}, {}, token, false, false}); + if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications + Realm::Internal::get_coordinator(*m_realm).wake_up_notifier_worker(); + m_have_callbacks = true; + } + return token; +} + +void CollectionNotifier::remove_callback(uint64_t token) +{ + // the callback needs to be destroyed after releasing the lock as destroying + // it could cause user code to be called + Callback old; + { + std::lock_guard lock(m_callback_mutex); + auto it = find_callback(token); + if (it == end(m_callbacks)) { + return; + } + + size_t idx = distance(begin(m_callbacks), it); + if (m_callback_index != npos) { + if (m_callback_index >= idx) + --m_callback_index; + } + --m_callback_count; + + old = std::move(*it); + m_callbacks.erase(it); + + m_have_callbacks = !m_callbacks.empty(); + } +} + +void CollectionNotifier::suppress_next_notification(uint64_t token) +{ + { + std::lock_guard lock(m_realm_mutex); + REALM_ASSERT(m_realm); + m_realm->verify_thread(); + m_realm->verify_in_write(); + } + + std::lock_guard lock(m_callback_mutex); + auto it = find_callback(token); + if (it != end(m_callbacks)) { + it->skip_next = true; + } +} + +std::vector::iterator CollectionNotifier::find_callback(uint64_t token) +{ + REALM_ASSERT(m_error || m_callbacks.size() > 0); + + auto it = find_if(begin(m_callbacks), end(m_callbacks), + [=](const auto& c) { return c.token == token; }); + // We should only fail to find the callback if it was removed due to an error + REALM_ASSERT(m_error || it != end(m_callbacks)); + return it; +} + +void CollectionNotifier::unregister() noexcept +{ + std::lock_guard lock(m_realm_mutex); + m_realm = nullptr; +} + +bool CollectionNotifier::is_alive() const noexcept +{ + std::lock_guard lock(m_realm_mutex); + return m_realm != nullptr; +} + +std::unique_lock CollectionNotifier::lock_target() +{ + return std::unique_lock{m_realm_mutex}; +} + +void CollectionNotifier::set_table(Table const& table) +{ + m_related_tables.clear(); + DeepChangeChecker::find_related_tables(m_related_tables, table); +} + +void CollectionNotifier::add_required_change_info(TransactionChangeInfo& info) +{ + if (!do_add_required_change_info(info) || m_related_tables.empty()) { + return; + } + + auto max = max_element(begin(m_related_tables), end(m_related_tables), + [](auto&& a, auto&& b) { return a.table_ndx < b.table_ndx; }); + + if (max->table_ndx >= info.table_modifications_needed.size()) + info.table_modifications_needed.resize(max->table_ndx + 1, false); + for (auto& tbl : m_related_tables) { + info.table_modifications_needed[tbl.table_ndx] = true; + } +} + +void CollectionNotifier::prepare_handover() +{ + REALM_ASSERT(m_sg); + m_sg_version = m_sg->get_version_of_current_transaction(); + do_prepare_handover(*m_sg); + m_has_run = true; + +#ifdef REALM_DEBUG + std::lock_guard lock(m_callback_mutex); + for (auto& callback : m_callbacks) + REALM_ASSERT(!callback.skip_next); +#endif +} + +void CollectionNotifier::before_advance() +{ + for_each_callback([&](auto& lock, auto& callback) { + if (callback.changes_to_deliver.empty()) { + return; + } + + auto changes = callback.changes_to_deliver; + // acquire a local reference to the callback so that removing the + // callback from within it can't result in a dangling pointer + auto cb = callback.fn; + lock.unlock(); + cb.before(changes); + }); +} + +void CollectionNotifier::after_advance() +{ + for_each_callback([&](auto& lock, auto& callback) { + if (callback.initial_delivered && callback.changes_to_deliver.empty()) { + return; + } + callback.initial_delivered = true; + + auto changes = std::move(callback.changes_to_deliver); + // acquire a local reference to the callback so that removing the + // callback from within it can't result in a dangling pointer + auto cb = callback.fn; + lock.unlock(); + cb.after(changes); + }); +} + +void CollectionNotifier::deliver_error(std::exception_ptr error) +{ + // Don't complain about double-unregistering callbacks + m_error = true; + + m_callback_count = m_callbacks.size(); + for_each_callback([this, &error](auto& lock, auto& callback) { + // acquire a local reference to the callback so that removing the + // callback from within it can't result in a dangling pointer + auto cb = std::move(callback.fn); + auto token = callback.token; + lock.unlock(); + cb.error(error); + + // We never want to call the callback again after this, so just remove it + this->remove_callback(token); + }); +} + +bool CollectionNotifier::is_for_realm(Realm& realm) const noexcept +{ + std::lock_guard lock(m_realm_mutex); + return m_realm.get() == &realm; +} + +bool CollectionNotifier::package_for_delivery() +{ + if (!prepare_to_deliver()) + return false; + std::lock_guard l(m_callback_mutex); + for (auto& callback : m_callbacks) + callback.changes_to_deliver = std::move(callback.accumulated_changes).finalize(); + m_callback_count = m_callbacks.size(); + return true; +} + +template +void CollectionNotifier::for_each_callback(Fn&& fn) +{ + std::unique_lock callback_lock(m_callback_mutex); + REALM_ASSERT_DEBUG(m_callback_count <= m_callbacks.size()); + for (++m_callback_index; m_callback_index < m_callback_count; ++m_callback_index) { + fn(callback_lock, m_callbacks[m_callback_index]); + if (!callback_lock.owns_lock()) + callback_lock.lock(); + } + + m_callback_index = npos; +} + +void CollectionNotifier::attach_to(SharedGroup& sg) +{ + REALM_ASSERT(!m_sg); + + m_sg = &sg; + do_attach_to(sg); +} + +void CollectionNotifier::detach() +{ + REALM_ASSERT(m_sg); + do_detach_from(*m_sg); + m_sg = nullptr; +} + +SharedGroup& CollectionNotifier::source_shared_group() +{ + return *Realm::Internal::get_shared_group(*m_realm); +} + +void CollectionNotifier::add_changes(CollectionChangeBuilder change) +{ + std::lock_guard lock(m_callback_mutex); + for (auto& callback : m_callbacks) { + if (callback.skip_next) { + REALM_ASSERT_DEBUG(callback.accumulated_changes.empty()); + callback.skip_next = false; + } + else { + if (&callback == &m_callbacks.back()) + callback.accumulated_changes.merge(std::move(change)); + else + callback.accumulated_changes.merge(CollectionChangeBuilder(change)); + } + } +} + +NotifierPackage::NotifierPackage(std::exception_ptr error, + std::vector> notifiers, + RealmCoordinator* coordinator) +: m_notifiers(std::move(notifiers)) +, m_coordinator(coordinator) +, m_error(std::move(error)) +{ +} + +void NotifierPackage::package_and_wait(util::Optional target_version) +{ + if (!m_coordinator || m_error || !*this) + return; + + auto lock = m_coordinator->wait_for_notifiers([&] { + if (!target_version) + return true; + return std::all_of(begin(m_notifiers), end(m_notifiers), [&](auto const& n) { + return !n->have_callbacks() || (n->has_run() && n->version().version >= *target_version); + }); + }); + + // Package the notifiers for delivery and remove any which don't have anything to deliver + auto package = [&](auto& notifier) { + if (notifier->has_run() && notifier->package_for_delivery()) { + m_version = notifier->version(); + return false; + } + return true; + }; + m_notifiers.erase(std::remove_if(begin(m_notifiers), end(m_notifiers), package), end(m_notifiers)); + if (m_version && target_version && m_version->version < *target_version) { + m_notifiers.clear(); + m_version = util::none; + } + REALM_ASSERT(m_version || m_notifiers.empty()); + + m_coordinator = nullptr; +} + +void NotifierPackage::before_advance() +{ + if (m_error) + return; + for (auto& notifier : m_notifiers) + notifier->before_advance(); +} + +void NotifierPackage::deliver(SharedGroup& sg) +{ + if (m_error) { + for (auto& notifier : m_notifiers) + notifier->deliver_error(m_error); + return; + } + // Can't deliver while in a write transaction + if (sg.get_transact_stage() != SharedGroup::transact_Reading) + return; + for (auto& notifier : m_notifiers) + notifier->deliver(sg); +} + +void NotifierPackage::after_advance() +{ + if (m_error) + return; + for (auto& notifier : m_notifiers) + notifier->after_advance(); +} + +void NotifierPackage::add_notifier(std::shared_ptr notifier) +{ + m_notifiers.push_back(notifier); + m_coordinator->register_notifier(notifier); +}