added iOS source code
[wl-app.git] / iOS / Pods / Realm / Realm / ObjectStore / src / impl / collection_notifier.cpp
diff --git a/iOS/Pods/Realm/Realm/ObjectStore/src/impl/collection_notifier.cpp b/iOS/Pods/Realm/Realm/ObjectStore/src/impl/collection_notifier.cpp
new file mode 100644 (file)
index 0000000..8fa94d3
--- /dev/null
@@ -0,0 +1,497 @@
+////////////////////////////////////////////////////////////////////////////
+//
+// Copyright 2016 Realm Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////
+
+#include "impl/collection_notifier.hpp"
+
+#include "impl/realm_coordinator.hpp"
+#include "shared_realm.hpp"
+
+#include <realm/group_shared.hpp>
+#include <realm/link_view.hpp>
+
+using namespace realm;
+using namespace realm::_impl;
+
+std::function<bool (size_t)>
+CollectionNotifier::get_modification_checker(TransactionChangeInfo const& info,
+                                             Table const& root_table)
+{
+    if (info.schema_changed)
+        set_table(root_table);
+
+    // First check if any of the tables accessible from the root table were
+    // actually modified. This can be false if there were only insertions, or
+    // deletions which were not linked to by any row in the linking table
+    auto table_modified = [&](auto& tbl) {
+        return tbl.table_ndx < info.tables.size()
+            && !info.tables[tbl.table_ndx].modifications.empty();
+    };
+    if (!any_of(begin(m_related_tables), end(m_related_tables), table_modified)) {
+        return [](size_t) { return false; };
+    }
+    if (m_related_tables.size() == 1) {
+        auto& modifications = info.tables[m_related_tables[0].table_ndx].modifications;
+        return [&](size_t row) { return modifications.contains(row); };
+    }
+
+    return DeepChangeChecker(info, root_table, m_related_tables);
+}
+
+void DeepChangeChecker::find_related_tables(std::vector<RelatedTable>& out, Table const& table)
+{
+    auto table_ndx = table.get_index_in_group();
+    if (table_ndx == npos)
+        return;
+    if (any_of(begin(out), end(out), [=](auto& tbl) { return tbl.table_ndx == table_ndx; }))
+        return;
+
+    // We need to add this table to `out` before recurring so that the check
+    // above works, but we can't store a pointer to the thing being populated
+    // because the recursive calls may resize `out`, so instead look it up by
+    // index every time
+    size_t out_index = out.size();
+    out.push_back({table_ndx, {}});
+
+    for (size_t i = 0, count = table.get_column_count(); i != count; ++i) {
+        auto type = table.get_column_type(i);
+        if (type == type_Link || type == type_LinkList) {
+            out[out_index].links.push_back({i, type == type_LinkList});
+            find_related_tables(out, *table.get_link_target(i));
+        }
+    }
+}
+
+DeepChangeChecker::DeepChangeChecker(TransactionChangeInfo const& info,
+                                     Table const& root_table,
+                                     std::vector<RelatedTable> const& related_tables)
+: m_info(info)
+, m_root_table(root_table)
+, m_root_table_ndx(root_table.get_index_in_group())
+, m_root_modifications(m_root_table_ndx < info.tables.size() ? &info.tables[m_root_table_ndx].modifications : nullptr)
+, m_related_tables(related_tables)
+{
+}
+
+bool DeepChangeChecker::check_outgoing_links(size_t table_ndx,
+                                             Table const& table,
+                                             size_t row_ndx, size_t depth)
+{
+    auto it = find_if(begin(m_related_tables), end(m_related_tables),
+                      [&](auto&& tbl) { return tbl.table_ndx == table_ndx; });
+    if (it == m_related_tables.end())
+        return false;
+
+    // Check if we're already checking if the destination of the link is
+    // modified, and if not add it to the stack
+    auto already_checking = [&](size_t col) {
+        auto end = m_current_path.begin() + depth;
+        auto match = std::find_if(m_current_path.begin(), end, [&](auto& p) {
+            return p.table == table_ndx && p.row == row_ndx && p.col == col;
+        });
+        if (match != end) {
+            for (; match < end; ++match) match->depth_exceeded = true;
+            return true;
+        }
+        m_current_path[depth] = {table_ndx, row_ndx, col, false};
+        return false;
+    };
+
+    auto linked_object_changed = [&](OutgoingLink const& link) {
+        if (already_checking(link.col_ndx))
+            return false;
+        if (!link.is_list) {
+            if (table.is_null_link(link.col_ndx, row_ndx))
+                return false;
+            auto dst = table.get_link(link.col_ndx, row_ndx);
+            return check_row(*table.get_link_target(link.col_ndx), dst, depth + 1);
+        }
+
+        auto& target = *table.get_link_target(link.col_ndx);
+        auto lvr = table.get_linklist(link.col_ndx, row_ndx);
+        for (size_t j = 0, size = lvr->size(); j < size; ++j) {
+            size_t dst = lvr->get(j).get_index();
+            if (check_row(target, dst, depth + 1))
+                return true;
+        }
+        return false;
+    };
+
+    return std::any_of(begin(it->links), end(it->links), linked_object_changed);
+}
+
+bool DeepChangeChecker::check_row(Table const& table, size_t idx, size_t depth)
+{
+    // Arbitrary upper limit on the maximum depth to search
+    if (depth >= m_current_path.size()) {
+        // Don't mark any of the intermediate rows checked along the path as
+        // not modified, as a search starting from them might hit a modification
+        for (size_t i = 0; i < m_current_path.size(); ++i)
+            m_current_path[i].depth_exceeded = true;
+        return false;
+    }
+
+    size_t table_ndx = table.get_index_in_group();
+    if (depth > 0 && table_ndx < m_info.tables.size() && m_info.tables[table_ndx].modifications.contains(idx))
+        return true;
+
+    if (m_not_modified.size() <= table_ndx)
+        m_not_modified.resize(table_ndx + 1);
+    if (m_not_modified[table_ndx].contains(idx))
+        return false;
+
+    bool ret = check_outgoing_links(table_ndx, table, idx, depth);
+    if (!ret && (depth == 0 || !m_current_path[depth - 1].depth_exceeded))
+        m_not_modified[table_ndx].add(idx);
+    return ret;
+}
+
+bool DeepChangeChecker::operator()(size_t ndx)
+{
+    if (m_root_modifications && m_root_modifications->contains(ndx))
+        return true;
+    return check_row(m_root_table, ndx, 0);
+}
+
+CollectionNotifier::CollectionNotifier(std::shared_ptr<Realm> realm)
+: m_realm(std::move(realm))
+, m_sg_version(Realm::Internal::get_shared_group(*m_realm)->get_version_of_current_transaction())
+{
+}
+
+CollectionNotifier::~CollectionNotifier()
+{
+    // Need to do this explicitly to ensure m_realm is destroyed with the mutex
+    // held to avoid potential double-deletion
+    unregister();
+}
+
+uint64_t CollectionNotifier::add_callback(CollectionChangeCallback callback)
+{
+    m_realm->verify_thread();
+
+    std::lock_guard<std::mutex> lock(m_callback_mutex);
+    auto token = m_next_token++;
+    m_callbacks.push_back({std::move(callback), {}, {}, token, false, false});
+    if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications
+        Realm::Internal::get_coordinator(*m_realm).wake_up_notifier_worker();
+        m_have_callbacks = true;
+    }
+    return token;
+}
+
+void CollectionNotifier::remove_callback(uint64_t token)
+{
+    // the callback needs to be destroyed after releasing the lock as destroying
+    // it could cause user code to be called
+    Callback old;
+    {
+        std::lock_guard<std::mutex> lock(m_callback_mutex);
+        auto it = find_callback(token);
+        if (it == end(m_callbacks)) {
+            return;
+        }
+
+        size_t idx = distance(begin(m_callbacks), it);
+        if (m_callback_index != npos) {
+            if (m_callback_index >= idx)
+                --m_callback_index;
+        }
+        --m_callback_count;
+
+        old = std::move(*it);
+        m_callbacks.erase(it);
+
+        m_have_callbacks = !m_callbacks.empty();
+    }
+}
+
+void CollectionNotifier::suppress_next_notification(uint64_t token)
+{
+    {
+        std::lock_guard<std::mutex> lock(m_realm_mutex);
+        REALM_ASSERT(m_realm);
+        m_realm->verify_thread();
+        m_realm->verify_in_write();
+    }
+
+    std::lock_guard<std::mutex> lock(m_callback_mutex);
+    auto it = find_callback(token);
+    if (it != end(m_callbacks)) {
+        it->skip_next = true;
+    }
+}
+
+std::vector<CollectionNotifier::Callback>::iterator CollectionNotifier::find_callback(uint64_t token)
+{
+    REALM_ASSERT(m_error || m_callbacks.size() > 0);
+
+    auto it = find_if(begin(m_callbacks), end(m_callbacks),
+                      [=](const auto& c) { return c.token == token; });
+    // We should only fail to find the callback if it was removed due to an error
+    REALM_ASSERT(m_error || it != end(m_callbacks));
+    return it;
+}
+
+void CollectionNotifier::unregister() noexcept
+{
+    std::lock_guard<std::mutex> lock(m_realm_mutex);
+    m_realm = nullptr;
+}
+
+bool CollectionNotifier::is_alive() const noexcept
+{
+    std::lock_guard<std::mutex> lock(m_realm_mutex);
+    return m_realm != nullptr;
+}
+
+std::unique_lock<std::mutex> CollectionNotifier::lock_target()
+{
+    return std::unique_lock<std::mutex>{m_realm_mutex};
+}
+
+void CollectionNotifier::set_table(Table const& table)
+{
+    m_related_tables.clear();
+    DeepChangeChecker::find_related_tables(m_related_tables, table);
+}
+
+void CollectionNotifier::add_required_change_info(TransactionChangeInfo& info)
+{
+    if (!do_add_required_change_info(info) || m_related_tables.empty()) {
+        return;
+    }
+
+    auto max = max_element(begin(m_related_tables), end(m_related_tables),
+                           [](auto&& a, auto&& b) { return a.table_ndx < b.table_ndx; });
+
+    if (max->table_ndx >= info.table_modifications_needed.size())
+        info.table_modifications_needed.resize(max->table_ndx + 1, false);
+    for (auto& tbl : m_related_tables) {
+        info.table_modifications_needed[tbl.table_ndx] = true;
+    }
+}
+
+void CollectionNotifier::prepare_handover()
+{
+    REALM_ASSERT(m_sg);
+    m_sg_version = m_sg->get_version_of_current_transaction();
+    do_prepare_handover(*m_sg);
+    m_has_run = true;
+
+#ifdef REALM_DEBUG
+    std::lock_guard<std::mutex> lock(m_callback_mutex);
+    for (auto& callback : m_callbacks)
+        REALM_ASSERT(!callback.skip_next);
+#endif
+}
+
+void CollectionNotifier::before_advance()
+{
+    for_each_callback([&](auto& lock, auto& callback) {
+        if (callback.changes_to_deliver.empty()) {
+            return;
+        }
+
+        auto changes = callback.changes_to_deliver;
+        // acquire a local reference to the callback so that removing the
+        // callback from within it can't result in a dangling pointer
+        auto cb = callback.fn;
+        lock.unlock();
+        cb.before(changes);
+    });
+}
+
+void CollectionNotifier::after_advance()
+{
+    for_each_callback([&](auto& lock, auto& callback) {
+        if (callback.initial_delivered && callback.changes_to_deliver.empty()) {
+            return;
+        }
+        callback.initial_delivered = true;
+
+        auto changes = std::move(callback.changes_to_deliver);
+        // acquire a local reference to the callback so that removing the
+        // callback from within it can't result in a dangling pointer
+        auto cb = callback.fn;
+        lock.unlock();
+        cb.after(changes);
+    });
+}
+
+void CollectionNotifier::deliver_error(std::exception_ptr error)
+{
+    // Don't complain about double-unregistering callbacks
+    m_error = true;
+
+    m_callback_count = m_callbacks.size();
+    for_each_callback([this, &error](auto& lock, auto& callback) {
+        // acquire a local reference to the callback so that removing the
+        // callback from within it can't result in a dangling pointer
+        auto cb = std::move(callback.fn);
+        auto token = callback.token;
+        lock.unlock();
+        cb.error(error);
+
+        // We never want to call the callback again after this, so just remove it
+        this->remove_callback(token);
+    });
+}
+
+bool CollectionNotifier::is_for_realm(Realm& realm) const noexcept
+{
+    std::lock_guard<std::mutex> lock(m_realm_mutex);
+    return m_realm.get() == &realm;
+}
+
+bool CollectionNotifier::package_for_delivery()
+{
+    if (!prepare_to_deliver())
+        return false;
+    std::lock_guard<std::mutex> l(m_callback_mutex);
+    for (auto& callback : m_callbacks)
+        callback.changes_to_deliver = std::move(callback.accumulated_changes).finalize();
+    m_callback_count = m_callbacks.size();
+    return true;
+}
+
+template<typename Fn>
+void CollectionNotifier::for_each_callback(Fn&& fn)
+{
+    std::unique_lock<std::mutex> callback_lock(m_callback_mutex);
+    REALM_ASSERT_DEBUG(m_callback_count <= m_callbacks.size());
+    for (++m_callback_index; m_callback_index < m_callback_count; ++m_callback_index) {
+        fn(callback_lock, m_callbacks[m_callback_index]);
+        if (!callback_lock.owns_lock())
+            callback_lock.lock();
+    }
+
+    m_callback_index = npos;
+}
+
+void CollectionNotifier::attach_to(SharedGroup& sg)
+{
+    REALM_ASSERT(!m_sg);
+
+    m_sg = &sg;
+    do_attach_to(sg);
+}
+
+void CollectionNotifier::detach()
+{
+    REALM_ASSERT(m_sg);
+    do_detach_from(*m_sg);
+    m_sg = nullptr;
+}
+
+SharedGroup& CollectionNotifier::source_shared_group()
+{
+    return *Realm::Internal::get_shared_group(*m_realm);
+}
+
+void CollectionNotifier::add_changes(CollectionChangeBuilder change)
+{
+    std::lock_guard<std::mutex> lock(m_callback_mutex);
+    for (auto& callback : m_callbacks) {
+        if (callback.skip_next) {
+            REALM_ASSERT_DEBUG(callback.accumulated_changes.empty());
+            callback.skip_next = false;
+        }
+        else {
+            if (&callback == &m_callbacks.back())
+                callback.accumulated_changes.merge(std::move(change));
+            else
+                callback.accumulated_changes.merge(CollectionChangeBuilder(change));
+        }
+    }
+}
+
+NotifierPackage::NotifierPackage(std::exception_ptr error,
+                                 std::vector<std::shared_ptr<CollectionNotifier>> notifiers,
+                                 RealmCoordinator* coordinator)
+: m_notifiers(std::move(notifiers))
+, m_coordinator(coordinator)
+, m_error(std::move(error))
+{
+}
+
+void NotifierPackage::package_and_wait(util::Optional<VersionID::version_type> target_version)
+{
+    if (!m_coordinator || m_error || !*this)
+        return;
+
+    auto lock = m_coordinator->wait_for_notifiers([&] {
+        if (!target_version)
+            return true;
+        return std::all_of(begin(m_notifiers), end(m_notifiers), [&](auto const& n) {
+            return !n->have_callbacks() || (n->has_run() && n->version().version >= *target_version);
+        });
+    });
+
+    // Package the notifiers for delivery and remove any which don't have anything to deliver
+    auto package = [&](auto& notifier) {
+        if (notifier->has_run() && notifier->package_for_delivery()) {
+            m_version = notifier->version();
+            return false;
+        }
+        return true;
+    };
+    m_notifiers.erase(std::remove_if(begin(m_notifiers), end(m_notifiers), package), end(m_notifiers));
+    if (m_version && target_version && m_version->version < *target_version) {
+        m_notifiers.clear();
+        m_version = util::none;
+    }
+    REALM_ASSERT(m_version || m_notifiers.empty());
+
+    m_coordinator = nullptr;
+}
+
+void NotifierPackage::before_advance()
+{
+    if (m_error)
+        return;
+    for (auto& notifier : m_notifiers)
+        notifier->before_advance();
+}
+
+void NotifierPackage::deliver(SharedGroup& sg)
+{
+    if (m_error) {
+        for (auto& notifier : m_notifiers)
+            notifier->deliver_error(m_error);
+        return;
+    }
+    // Can't deliver while in a write transaction
+    if (sg.get_transact_stage() != SharedGroup::transact_Reading)
+        return;
+    for (auto& notifier : m_notifiers)
+        notifier->deliver(sg);
+}
+
+void NotifierPackage::after_advance()
+{
+    if (m_error)
+        return;
+    for (auto& notifier : m_notifiers)
+        notifier->after_advance();
+}
+
+void NotifierPackage::add_notifier(std::shared_ptr<CollectionNotifier> notifier)
+{
+    m_notifiers.push_back(notifier);
+    m_coordinator->register_notifier(notifier);
+}