1 ////////////////////////////////////////////////////////////////////////////
3 // Copyright 2015 Realm Inc.
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
17 ////////////////////////////////////////////////////////////////////////////
19 #include "impl/results_notifier.hpp"
21 #include "shared_realm.hpp"
23 using namespace realm;
24 using namespace realm::_impl;
26 ResultsNotifier::ResultsNotifier(Results& target)
27 : CollectionNotifier(target.get_realm())
28 , m_target_results(&target)
29 , m_target_is_in_table_order(target.is_in_table_order())
31 Query q = target.get_query();
32 set_table(*q.get_table());
33 m_query_handover = source_shared_group().export_for_handover(q, MutableSourcePayload::Move);
34 DescriptorOrdering::generate_patch(target.get_descriptor_ordering(), m_ordering_handover);
37 void ResultsNotifier::target_results_moved(Results& old_target, Results& new_target)
39 auto lock = lock_target();
41 REALM_ASSERT(m_target_results == &old_target);
42 m_target_results = &new_target;
45 void ResultsNotifier::release_data() noexcept
50 // Most of the inter-thread synchronization for run(), prepare_handover(),
51 // attach_to(), detach(), release_data() and deliver() is done by
52 // RealmCoordinator external to this code, which has some potentially
53 // non-obvious results on which members are and are not safe to use without
56 // add_required_change_info(), attach_to(), detach(), run(),
57 // prepare_handover(), and release_data() are all only ever called on a single
58 // background worker thread. call_callbacks() and deliver() are called on the
59 // target thread. Calls to prepare_handover() and deliver() are guarded by a
62 // In total, this means that the safe data flow is as follows:
63 // - add_Required_change_info(), prepare_handover(), attach_to(), detach() and
64 // release_data() can read members written by each other
65 // - deliver() can read members written to in prepare_handover(), deliver(),
66 // and call_callbacks()
67 // - call_callbacks() and read members written to in deliver()
69 // Separately from the handover data flow, m_target_results is guarded by the target lock
71 bool ResultsNotifier::do_add_required_change_info(TransactionChangeInfo& info)
73 REALM_ASSERT(m_query);
76 auto& table = *m_query->get_table();
77 if (!table.is_attached())
80 auto table_ndx = table.get_index_in_group();
81 if (table_ndx == npos) { // is a subtable
82 auto& parent = *table.get_parent_table();
83 size_t row_ndx = table.get_parent_row_index();
84 size_t col_ndx = find_container_column(parent, row_ndx, &table, type_Table, &Table::get_subtable);
85 info.lists.push_back({parent.get_index_in_group(), row_ndx, col_ndx, &m_changes});
87 else { // is a top-level table
88 if (info.table_moves_needed.size() <= table_ndx)
89 info.table_moves_needed.resize(table_ndx + 1);
90 info.table_moves_needed[table_ndx] = true;
93 return has_run() && have_callbacks();
96 bool ResultsNotifier::need_to_run()
99 REALM_ASSERT(!m_tv.is_attached());
102 auto lock = lock_target();
103 // Don't run the query if the results aren't actually going to be used
104 if (!get_realm() || (!have_callbacks() && !m_target_results->wants_background_updates())) {
109 // If we've run previously, check if we need to rerun
110 if (has_run() && m_query->sync_view_if_needed() == m_last_seen_version) {
117 void ResultsNotifier::calculate_changes()
119 size_t table_ndx = m_query->get_table()->get_index_in_group();
121 CollectionChangeBuilder* changes = nullptr;
122 if (table_ndx == npos)
123 changes = &m_changes;
124 else if (table_ndx < m_info->tables.size())
125 changes = &m_info->tables[table_ndx];
127 std::vector<size_t> next_rows;
128 next_rows.reserve(m_tv.size());
129 for (size_t i = 0; i < m_tv.size(); ++i)
130 next_rows.push_back(m_tv[i].get_index());
132 util::Optional<IndexSet> move_candidates;
134 auto const& moves = changes->moves;
135 for (auto& idx : m_previous_rows) {
136 if (changes->deletions.contains(idx)) {
137 // check if this deletion was actually a move
138 auto it = lower_bound(begin(moves), end(moves), idx,
139 [](auto const& a, auto b) { return a.from < b; });
140 idx = it != moves.end() && it->from == idx ? it->to : npos;
143 idx = changes->insertions.shift(changes->deletions.unshift(idx));
145 if (m_target_is_in_table_order && !m_descriptor_ordering.will_apply_sort())
146 move_candidates = changes->insertions;
149 m_changes = CollectionChangeBuilder::calculate(m_previous_rows, next_rows,
150 get_modification_checker(*m_info, *m_query->get_table()),
153 m_previous_rows = std::move(next_rows);
156 m_previous_rows.resize(m_tv.size());
157 for (size_t i = 0; i < m_tv.size(); ++i)
158 m_previous_rows[i] = m_tv[i].get_index();
162 void ResultsNotifier::run()
164 // Table's been deleted, so report all rows as deleted
165 if (!m_query->get_table()->is_attached()) {
167 m_changes.deletions.set(m_previous_rows.size());
168 m_previous_rows.clear();
175 m_query->sync_view_if_needed();
176 m_tv = m_query->find_all();
177 m_tv.apply_descriptor_ordering(m_descriptor_ordering);
178 m_last_seen_version = m_tv.sync_if_needed();
183 void ResultsNotifier::do_prepare_handover(SharedGroup& sg)
185 if (!m_tv.is_attached()) {
186 // if the table version didn't change we can just reuse the same handover
187 // object and bump its version to the current SG version
189 m_tv_handover->version = sg.get_version_of_current_transaction();
191 // add_changes() needs to be called even if there are no changes to
192 // clear the skip flag on the callbacks
193 add_changes(std::move(m_changes));
197 REALM_ASSERT(m_tv.is_in_sync());
199 m_tv_handover = sg.export_for_handover(m_tv, MutableSourcePayload::Move);
201 add_changes(std::move(m_changes));
202 REALM_ASSERT(m_changes.empty());
204 // detach the TableView as we won't need it again and keeping it around
205 // makes advance_read() much more expensive
209 void ResultsNotifier::deliver(SharedGroup& sg)
211 auto lock = lock_target();
213 // Target realm being null here indicates that we were unregistered while we
214 // were in the process of advancing the Realm version and preparing for
215 // delivery, i.e. the results was destroyed from the "wrong" thread
220 REALM_ASSERT(!m_query_handover);
221 if (m_tv_to_deliver) {
222 Results::Internal::set_table_view(*m_target_results,
223 std::move(*sg.import_from_handover(std::move(m_tv_to_deliver))));
225 REALM_ASSERT(!m_tv_to_deliver);
228 bool ResultsNotifier::prepare_to_deliver()
230 auto lock = lock_target();
233 m_tv_to_deliver = std::move(m_tv_handover);
237 void ResultsNotifier::do_attach_to(SharedGroup& sg)
239 REALM_ASSERT(m_query_handover);
240 m_query = sg.import_from_handover(std::move(m_query_handover));
241 m_descriptor_ordering = DescriptorOrdering::create_from_and_consume_patch(m_ordering_handover, *m_query->get_table());
244 void ResultsNotifier::do_detach_from(SharedGroup& sg)
246 REALM_ASSERT(m_query);
247 REALM_ASSERT(!m_tv.is_attached());
249 DescriptorOrdering::generate_patch(m_descriptor_ordering, m_ordering_handover);
250 m_query_handover = sg.export_for_handover(*m_query, MutableSourcePayload::Move);