added iOS source code
[wl-app.git] / iOS / Pods / Realm / Realm / ObjectStore / src / impl / collection_notifier.cpp
1 ////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright 2016 Realm Inc.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 ////////////////////////////////////////////////////////////////////////////
18
19 #include "impl/collection_notifier.hpp"
20
21 #include "impl/realm_coordinator.hpp"
22 #include "shared_realm.hpp"
23
24 #include <realm/group_shared.hpp>
25 #include <realm/link_view.hpp>
26
27 using namespace realm;
28 using namespace realm::_impl;
29
30 std::function<bool (size_t)>
31 CollectionNotifier::get_modification_checker(TransactionChangeInfo const& info,
32                                              Table const& root_table)
33 {
34     if (info.schema_changed)
35         set_table(root_table);
36
37     // First check if any of the tables accessible from the root table were
38     // actually modified. This can be false if there were only insertions, or
39     // deletions which were not linked to by any row in the linking table
40     auto table_modified = [&](auto& tbl) {
41         return tbl.table_ndx < info.tables.size()
42             && !info.tables[tbl.table_ndx].modifications.empty();
43     };
44     if (!any_of(begin(m_related_tables), end(m_related_tables), table_modified)) {
45         return [](size_t) { return false; };
46     }
47     if (m_related_tables.size() == 1) {
48         auto& modifications = info.tables[m_related_tables[0].table_ndx].modifications;
49         return [&](size_t row) { return modifications.contains(row); };
50     }
51
52     return DeepChangeChecker(info, root_table, m_related_tables);
53 }
54
55 void DeepChangeChecker::find_related_tables(std::vector<RelatedTable>& out, Table const& table)
56 {
57     auto table_ndx = table.get_index_in_group();
58     if (table_ndx == npos)
59         return;
60     if (any_of(begin(out), end(out), [=](auto& tbl) { return tbl.table_ndx == table_ndx; }))
61         return;
62
63     // We need to add this table to `out` before recurring so that the check
64     // above works, but we can't store a pointer to the thing being populated
65     // because the recursive calls may resize `out`, so instead look it up by
66     // index every time
67     size_t out_index = out.size();
68     out.push_back({table_ndx, {}});
69
70     for (size_t i = 0, count = table.get_column_count(); i != count; ++i) {
71         auto type = table.get_column_type(i);
72         if (type == type_Link || type == type_LinkList) {
73             out[out_index].links.push_back({i, type == type_LinkList});
74             find_related_tables(out, *table.get_link_target(i));
75         }
76     }
77 }
78
79 DeepChangeChecker::DeepChangeChecker(TransactionChangeInfo const& info,
80                                      Table const& root_table,
81                                      std::vector<RelatedTable> const& related_tables)
82 : m_info(info)
83 , m_root_table(root_table)
84 , m_root_table_ndx(root_table.get_index_in_group())
85 , m_root_modifications(m_root_table_ndx < info.tables.size() ? &info.tables[m_root_table_ndx].modifications : nullptr)
86 , m_related_tables(related_tables)
87 {
88 }
89
90 bool DeepChangeChecker::check_outgoing_links(size_t table_ndx,
91                                              Table const& table,
92                                              size_t row_ndx, size_t depth)
93 {
94     auto it = find_if(begin(m_related_tables), end(m_related_tables),
95                       [&](auto&& tbl) { return tbl.table_ndx == table_ndx; });
96     if (it == m_related_tables.end())
97         return false;
98
99     // Check if we're already checking if the destination of the link is
100     // modified, and if not add it to the stack
101     auto already_checking = [&](size_t col) {
102         auto end = m_current_path.begin() + depth;
103         auto match = std::find_if(m_current_path.begin(), end, [&](auto& p) {
104             return p.table == table_ndx && p.row == row_ndx && p.col == col;
105         });
106         if (match != end) {
107             for (; match < end; ++match) match->depth_exceeded = true;
108             return true;
109         }
110         m_current_path[depth] = {table_ndx, row_ndx, col, false};
111         return false;
112     };
113
114     auto linked_object_changed = [&](OutgoingLink const& link) {
115         if (already_checking(link.col_ndx))
116             return false;
117         if (!link.is_list) {
118             if (table.is_null_link(link.col_ndx, row_ndx))
119                 return false;
120             auto dst = table.get_link(link.col_ndx, row_ndx);
121             return check_row(*table.get_link_target(link.col_ndx), dst, depth + 1);
122         }
123
124         auto& target = *table.get_link_target(link.col_ndx);
125         auto lvr = table.get_linklist(link.col_ndx, row_ndx);
126         for (size_t j = 0, size = lvr->size(); j < size; ++j) {
127             size_t dst = lvr->get(j).get_index();
128             if (check_row(target, dst, depth + 1))
129                 return true;
130         }
131         return false;
132     };
133
134     return std::any_of(begin(it->links), end(it->links), linked_object_changed);
135 }
136
137 bool DeepChangeChecker::check_row(Table const& table, size_t idx, size_t depth)
138 {
139     // Arbitrary upper limit on the maximum depth to search
140     if (depth >= m_current_path.size()) {
141         // Don't mark any of the intermediate rows checked along the path as
142         // not modified, as a search starting from them might hit a modification
143         for (size_t i = 0; i < m_current_path.size(); ++i)
144             m_current_path[i].depth_exceeded = true;
145         return false;
146     }
147
148     size_t table_ndx = table.get_index_in_group();
149     if (depth > 0 && table_ndx < m_info.tables.size() && m_info.tables[table_ndx].modifications.contains(idx))
150         return true;
151
152     if (m_not_modified.size() <= table_ndx)
153         m_not_modified.resize(table_ndx + 1);
154     if (m_not_modified[table_ndx].contains(idx))
155         return false;
156
157     bool ret = check_outgoing_links(table_ndx, table, idx, depth);
158     if (!ret && (depth == 0 || !m_current_path[depth - 1].depth_exceeded))
159         m_not_modified[table_ndx].add(idx);
160     return ret;
161 }
162
163 bool DeepChangeChecker::operator()(size_t ndx)
164 {
165     if (m_root_modifications && m_root_modifications->contains(ndx))
166         return true;
167     return check_row(m_root_table, ndx, 0);
168 }
169
170 CollectionNotifier::CollectionNotifier(std::shared_ptr<Realm> realm)
171 : m_realm(std::move(realm))
172 , m_sg_version(Realm::Internal::get_shared_group(*m_realm)->get_version_of_current_transaction())
173 {
174 }
175
176 CollectionNotifier::~CollectionNotifier()
177 {
178     // Need to do this explicitly to ensure m_realm is destroyed with the mutex
179     // held to avoid potential double-deletion
180     unregister();
181 }
182
183 uint64_t CollectionNotifier::add_callback(CollectionChangeCallback callback)
184 {
185     m_realm->verify_thread();
186
187     std::lock_guard<std::mutex> lock(m_callback_mutex);
188     auto token = m_next_token++;
189     m_callbacks.push_back({std::move(callback), {}, {}, token, false, false});
190     if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications
191         Realm::Internal::get_coordinator(*m_realm).wake_up_notifier_worker();
192         m_have_callbacks = true;
193     }
194     return token;
195 }
196
197 void CollectionNotifier::remove_callback(uint64_t token)
198 {
199     // the callback needs to be destroyed after releasing the lock as destroying
200     // it could cause user code to be called
201     Callback old;
202     {
203         std::lock_guard<std::mutex> lock(m_callback_mutex);
204         auto it = find_callback(token);
205         if (it == end(m_callbacks)) {
206             return;
207         }
208
209         size_t idx = distance(begin(m_callbacks), it);
210         if (m_callback_index != npos) {
211             if (m_callback_index >= idx)
212                 --m_callback_index;
213         }
214         --m_callback_count;
215
216         old = std::move(*it);
217         m_callbacks.erase(it);
218
219         m_have_callbacks = !m_callbacks.empty();
220     }
221 }
222
223 void CollectionNotifier::suppress_next_notification(uint64_t token)
224 {
225     {
226         std::lock_guard<std::mutex> lock(m_realm_mutex);
227         REALM_ASSERT(m_realm);
228         m_realm->verify_thread();
229         m_realm->verify_in_write();
230     }
231
232     std::lock_guard<std::mutex> lock(m_callback_mutex);
233     auto it = find_callback(token);
234     if (it != end(m_callbacks)) {
235         it->skip_next = true;
236     }
237 }
238
239 std::vector<CollectionNotifier::Callback>::iterator CollectionNotifier::find_callback(uint64_t token)
240 {
241     REALM_ASSERT(m_error || m_callbacks.size() > 0);
242
243     auto it = find_if(begin(m_callbacks), end(m_callbacks),
244                       [=](const auto& c) { return c.token == token; });
245     // We should only fail to find the callback if it was removed due to an error
246     REALM_ASSERT(m_error || it != end(m_callbacks));
247     return it;
248 }
249
250 void CollectionNotifier::unregister() noexcept
251 {
252     std::lock_guard<std::mutex> lock(m_realm_mutex);
253     m_realm = nullptr;
254 }
255
256 bool CollectionNotifier::is_alive() const noexcept
257 {
258     std::lock_guard<std::mutex> lock(m_realm_mutex);
259     return m_realm != nullptr;
260 }
261
262 std::unique_lock<std::mutex> CollectionNotifier::lock_target()
263 {
264     return std::unique_lock<std::mutex>{m_realm_mutex};
265 }
266
267 void CollectionNotifier::set_table(Table const& table)
268 {
269     m_related_tables.clear();
270     DeepChangeChecker::find_related_tables(m_related_tables, table);
271 }
272
273 void CollectionNotifier::add_required_change_info(TransactionChangeInfo& info)
274 {
275     if (!do_add_required_change_info(info) || m_related_tables.empty()) {
276         return;
277     }
278
279     auto max = max_element(begin(m_related_tables), end(m_related_tables),
280                            [](auto&& a, auto&& b) { return a.table_ndx < b.table_ndx; });
281
282     if (max->table_ndx >= info.table_modifications_needed.size())
283         info.table_modifications_needed.resize(max->table_ndx + 1, false);
284     for (auto& tbl : m_related_tables) {
285         info.table_modifications_needed[tbl.table_ndx] = true;
286     }
287 }
288
289 void CollectionNotifier::prepare_handover()
290 {
291     REALM_ASSERT(m_sg);
292     m_sg_version = m_sg->get_version_of_current_transaction();
293     do_prepare_handover(*m_sg);
294     m_has_run = true;
295
296 #ifdef REALM_DEBUG
297     std::lock_guard<std::mutex> lock(m_callback_mutex);
298     for (auto& callback : m_callbacks)
299         REALM_ASSERT(!callback.skip_next);
300 #endif
301 }
302
303 void CollectionNotifier::before_advance()
304 {
305     for_each_callback([&](auto& lock, auto& callback) {
306         if (callback.changes_to_deliver.empty()) {
307             return;
308         }
309
310         auto changes = callback.changes_to_deliver;
311         // acquire a local reference to the callback so that removing the
312         // callback from within it can't result in a dangling pointer
313         auto cb = callback.fn;
314         lock.unlock();
315         cb.before(changes);
316     });
317 }
318
319 void CollectionNotifier::after_advance()
320 {
321     for_each_callback([&](auto& lock, auto& callback) {
322         if (callback.initial_delivered && callback.changes_to_deliver.empty()) {
323             return;
324         }
325         callback.initial_delivered = true;
326
327         auto changes = std::move(callback.changes_to_deliver);
328         // acquire a local reference to the callback so that removing the
329         // callback from within it can't result in a dangling pointer
330         auto cb = callback.fn;
331         lock.unlock();
332         cb.after(changes);
333     });
334 }
335
336 void CollectionNotifier::deliver_error(std::exception_ptr error)
337 {
338     // Don't complain about double-unregistering callbacks
339     m_error = true;
340
341     m_callback_count = m_callbacks.size();
342     for_each_callback([this, &error](auto& lock, auto& callback) {
343         // acquire a local reference to the callback so that removing the
344         // callback from within it can't result in a dangling pointer
345         auto cb = std::move(callback.fn);
346         auto token = callback.token;
347         lock.unlock();
348         cb.error(error);
349
350         // We never want to call the callback again after this, so just remove it
351         this->remove_callback(token);
352     });
353 }
354
355 bool CollectionNotifier::is_for_realm(Realm& realm) const noexcept
356 {
357     std::lock_guard<std::mutex> lock(m_realm_mutex);
358     return m_realm.get() == &realm;
359 }
360
361 bool CollectionNotifier::package_for_delivery()
362 {
363     if (!prepare_to_deliver())
364         return false;
365     std::lock_guard<std::mutex> l(m_callback_mutex);
366     for (auto& callback : m_callbacks)
367         callback.changes_to_deliver = std::move(callback.accumulated_changes).finalize();
368     m_callback_count = m_callbacks.size();
369     return true;
370 }
371
372 template<typename Fn>
373 void CollectionNotifier::for_each_callback(Fn&& fn)
374 {
375     std::unique_lock<std::mutex> callback_lock(m_callback_mutex);
376     REALM_ASSERT_DEBUG(m_callback_count <= m_callbacks.size());
377     for (++m_callback_index; m_callback_index < m_callback_count; ++m_callback_index) {
378         fn(callback_lock, m_callbacks[m_callback_index]);
379         if (!callback_lock.owns_lock())
380             callback_lock.lock();
381     }
382
383     m_callback_index = npos;
384 }
385
386 void CollectionNotifier::attach_to(SharedGroup& sg)
387 {
388     REALM_ASSERT(!m_sg);
389
390     m_sg = &sg;
391     do_attach_to(sg);
392 }
393
394 void CollectionNotifier::detach()
395 {
396     REALM_ASSERT(m_sg);
397     do_detach_from(*m_sg);
398     m_sg = nullptr;
399 }
400
401 SharedGroup& CollectionNotifier::source_shared_group()
402 {
403     return *Realm::Internal::get_shared_group(*m_realm);
404 }
405
406 void CollectionNotifier::add_changes(CollectionChangeBuilder change)
407 {
408     std::lock_guard<std::mutex> lock(m_callback_mutex);
409     for (auto& callback : m_callbacks) {
410         if (callback.skip_next) {
411             REALM_ASSERT_DEBUG(callback.accumulated_changes.empty());
412             callback.skip_next = false;
413         }
414         else {
415             if (&callback == &m_callbacks.back())
416                 callback.accumulated_changes.merge(std::move(change));
417             else
418                 callback.accumulated_changes.merge(CollectionChangeBuilder(change));
419         }
420     }
421 }
422
423 NotifierPackage::NotifierPackage(std::exception_ptr error,
424                                  std::vector<std::shared_ptr<CollectionNotifier>> notifiers,
425                                  RealmCoordinator* coordinator)
426 : m_notifiers(std::move(notifiers))
427 , m_coordinator(coordinator)
428 , m_error(std::move(error))
429 {
430 }
431
432 void NotifierPackage::package_and_wait(util::Optional<VersionID::version_type> target_version)
433 {
434     if (!m_coordinator || m_error || !*this)
435         return;
436
437     auto lock = m_coordinator->wait_for_notifiers([&] {
438         if (!target_version)
439             return true;
440         return std::all_of(begin(m_notifiers), end(m_notifiers), [&](auto const& n) {
441             return !n->have_callbacks() || (n->has_run() && n->version().version >= *target_version);
442         });
443     });
444
445     // Package the notifiers for delivery and remove any which don't have anything to deliver
446     auto package = [&](auto& notifier) {
447         if (notifier->has_run() && notifier->package_for_delivery()) {
448             m_version = notifier->version();
449             return false;
450         }
451         return true;
452     };
453     m_notifiers.erase(std::remove_if(begin(m_notifiers), end(m_notifiers), package), end(m_notifiers));
454     if (m_version && target_version && m_version->version < *target_version) {
455         m_notifiers.clear();
456         m_version = util::none;
457     }
458     REALM_ASSERT(m_version || m_notifiers.empty());
459
460     m_coordinator = nullptr;
461 }
462
463 void NotifierPackage::before_advance()
464 {
465     if (m_error)
466         return;
467     for (auto& notifier : m_notifiers)
468         notifier->before_advance();
469 }
470
471 void NotifierPackage::deliver(SharedGroup& sg)
472 {
473     if (m_error) {
474         for (auto& notifier : m_notifiers)
475             notifier->deliver_error(m_error);
476         return;
477     }
478     // Can't deliver while in a write transaction
479     if (sg.get_transact_stage() != SharedGroup::transact_Reading)
480         return;
481     for (auto& notifier : m_notifiers)
482         notifier->deliver(sg);
483 }
484
485 void NotifierPackage::after_advance()
486 {
487     if (m_error)
488         return;
489     for (auto& notifier : m_notifiers)
490         notifier->after_advance();
491 }
492
493 void NotifierPackage::add_notifier(std::shared_ptr<CollectionNotifier> notifier)
494 {
495     m_notifiers.push_back(notifier);
496     m_coordinator->register_notifier(notifier);
497 }