2 * Copyright 2017 Google
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #import "FIRMessagingRmqManager.h"
21 #import "FIRMessagingDefines.h"
22 #import "FIRMessagingLogger.h"
23 #import "FIRMessagingRmq2PersistentStore.h"
24 #import "FIRMessagingUtilities.h"
25 #import "Protos/GtalkCore.pbobjc.h"
27 #ifndef _FIRMessagingRmqLogAndExit
28 #define _FIRMessagingRmqLogAndExit(stmt, return_value) \
30 [self logErrorAndFinalizeStatement:stmt]; \
31 return return_value; \
35 static NSString *const kFCMRmqTag = @"FIRMessagingRmq:";
37 @interface FIRMessagingRmqManager ()
39 @property(nonatomic, readwrite, strong) FIRMessagingRmq2PersistentStore *rmq2Store;
40 // map the category of an outgoing message with the number of messages for that category
41 // should always have two keys -- the app, gcm
42 @property(nonatomic, readwrite, strong) NSMutableDictionary *outstandingMessages;
44 // Outgoing RMQ persistent id
45 @property(nonatomic, readwrite, assign) int64_t rmqId;
49 @implementation FIRMessagingRmqManager
51 - (instancetype)initWithDatabaseName:(NSString *)databaseName {
54 _FIRMessagingDevAssert([databaseName length] > 0, @"RMQ: Invalid rmq db name");
55 _rmq2Store = [[FIRMessagingRmq2PersistentStore alloc] initWithDatabaseName:databaseName];
56 _outstandingMessages = [NSMutableDictionary dictionaryWithCapacity:2];
63 if (self.rmqId >= 0) {
64 return; // already done
67 [self loadInitialOutgoingPersistentId];
68 if (self.outstandingMessages.count) {
69 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeRmqManager000,
70 @"%@: outstanding categories %ld", kFCMRmqTag,
71 _FIRMessaging_UL(self.outstandingMessages.count));
76 * Initialize the 'initial RMQ':
77 * - max ID of any message in the queue
78 * - if the queue is empty, stored value in separate DB.
80 * Stream acks will remove from RMQ, when we remove the highest message we keep track
83 - (void)loadInitialOutgoingPersistentId {
85 // we shouldn't always trust the lastRmqId stored in the LastRmqId table, because
86 // we only save to the LastRmqId table once in a while (after getting the lastRmqId sent
87 // by the server after reconnect, and after getting a rmq ack from the server). The
88 // rmq message with the highest rmq id tells the real story, so check against that first.
90 int64_t rmqId = [self queryHighestRmqId];
92 rmqId = [self querylastRmqId];
94 self.rmqId = rmqId + 1;
100 * Save a message to RMQ2. Will populate the rmq2 persistent ID.
102 - (BOOL)saveRmqMessage:(GPBMessage *)message
103 error:(NSError **)error {
104 // send using rmq2manager
105 // the wire format of rmq2 id is a string. However, we keep it as a long internally
106 // in the database. So only convert the id to string when preparing for sending over
108 NSString *rmq2Id = FIRMessagingGetRmq2Id(message);
109 if (![rmq2Id length]) {
110 int64_t rmqId = [self nextRmqId];
111 rmq2Id = [NSString stringWithFormat:@"%lld", rmqId];
112 FIRMessagingSetRmq2Id(message, rmq2Id);
114 FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(message);
115 return [self saveMessage:message withRmqId:[rmq2Id integerValue] tag:tag error:error];
118 - (BOOL)saveMessage:(GPBMessage *)message
119 withRmqId:(int64_t)rmqId
121 error:(NSError **)error {
122 NSData *data = [message data];
123 return [self.rmq2Store saveMessageWithRmqId:rmqId tag:tag data:data error:error];
127 * This is called when we delete the largest outgoing message from queue.
129 - (void)saveLastOutgoingRmqId:(int64_t)rmqID {
130 [self.rmq2Store updateLastOutgoingRmqId:rmqID];
133 - (BOOL)saveS2dMessageWithRmqId:(NSString *)rmqID {
134 return [self.rmq2Store saveUnackedS2dMessageWithRmqId:rmqID];
139 - (int64_t)queryHighestRmqId {
140 return [self.rmq2Store queryHighestRmqId];
143 - (int64_t)querylastRmqId {
144 return [self.rmq2Store queryLastRmqId];
147 - (NSArray *)unackedS2dRmqIds {
148 return [self.rmq2Store unackedS2dRmqIds];
151 #pragma mark - FIRMessagingRMQScanner protocol
154 * We don't have a 'getMessages' method - it would require loading in memory
155 * the entire content body of all messages.
157 * Instead we iterate and call 'resend' for each message.
160 * - on connect MCS, to resend any outstanding messages
163 - (void)scanWithRmqMessageHandler:(FIRMessagingRmqMessageHandler)rmqMessageHandler
164 dataMessageHandler:(FIRMessagingDataMessageHandler)dataMessageHandler {
165 // no need to scan database with no callbacks
166 if (rmqMessageHandler || dataMessageHandler) {
167 [self.rmq2Store scanOutgoingRmqMessagesWithHandler:^(int64_t rmqId, int8_t tag, NSData *data) {
168 if (rmqMessageHandler != nil) {
169 rmqMessageHandler(rmqId, tag, data);
171 if (dataMessageHandler != nil && kFIRMessagingProtoTagDataMessageStanza == tag) {
173 [FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag) parseFromData:data error:NULL];
174 GtalkDataMessageStanza *stanza = (GtalkDataMessageStanza *)proto;
175 dataMessageHandler(rmqId, stanza);
181 #pragma mark - Remove
183 - (void)ackReceivedForRmqId:(NSString *)rmqId {
184 // TODO: Optional book-keeping
187 - (int)removeRmqMessagesWithRmqId:(NSString *)rmqId {
188 return [self removeRmqMessagesWithRmqIds:@[rmqId]];
191 - (int)removeRmqMessagesWithRmqIds:(NSArray *)rmqIds {
192 if (![rmqIds count]) {
195 for (NSString *rmqId in rmqIds) {
196 [self ackReceivedForRmqId:rmqId];
198 int64_t maxRmqId = -1;
199 for (NSString *rmqId in rmqIds) {
200 int64_t rmqIdValue = [rmqId longLongValue];
201 if (rmqIdValue > maxRmqId) {
202 maxRmqId = rmqIdValue;
206 if (maxRmqId >= self.rmqId) {
207 [self saveLastOutgoingRmqId:maxRmqId];
209 return [self.rmq2Store deleteMessagesFromTable:kTableOutgoingRmqMessages withRmqIds:rmqIds];
212 - (void)removeS2dIds:(NSArray *)s2dIds {
213 [self.rmq2Store deleteMessagesFromTable:kTableS2DRmqIds withRmqIds:s2dIds];
216 #pragma mark - Sync Messages
218 // TODO: RMQManager should also have a cache for all the sync messages
219 // so we don't hit the DB each time.
220 - (FIRMessagingPersistentSyncMessage *)querySyncMessageWithRmqID:(NSString *)rmqID {
221 return [self.rmq2Store querySyncMessageWithRmqID:rmqID];
224 - (BOOL)deleteSyncMessageWithRmqID:(NSString *)rmqID {
225 return [self.rmq2Store deleteSyncMessageWithRmqID:rmqID];
228 - (int)deleteExpiredOrFinishedSyncMessages:(NSError **)error {
229 return [self.rmq2Store deleteExpiredOrFinishedSyncMessages:error];
232 - (BOOL)saveSyncMessageWithRmqID:(NSString *)rmqID
233 expirationTime:(int64_t)expirationTime
234 apnsReceived:(BOOL)apnsReceived
235 mcsReceived:(BOOL)mcsReceived
236 error:(NSError *__autoreleasing *)error {
237 return [self.rmq2Store saveSyncMessageWithRmqID:rmqID
238 expirationTime:expirationTime
239 apnsReceived:apnsReceived
240 mcsReceived:mcsReceived
244 - (BOOL)updateSyncMessageViaAPNSWithRmqID:(NSString *)rmqID error:(NSError **)error {
245 return [self.rmq2Store updateSyncMessageViaAPNSWithRmqID:rmqID error:error];
248 - (BOOL)updateSyncMessageViaMCSWithRmqID:(NSString *)rmqID error:(NSError **)error {
249 return [self.rmq2Store updateSyncMessageViaMCSWithRmqID:rmqID error:error];
252 #pragma mark - Testing
254 + (void)removeDatabaseWithName:(NSString *)dbName {
255 [FIRMessagingRmq2PersistentStore removeDatabase:dbName];
258 #pragma mark - Private
260 - (int64_t)nextRmqId {