Added Android code
[wl-app.git] / iOS / Pods / FirebaseMessaging / Firebase / Messaging / FIRMessagingConnection.m
1 /*
2  * Copyright 2017 Google
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #import "FIRMessagingConnection.h"
18
19 #import "Protos/GtalkCore.pbobjc.h"
20 #import "Protos/GtalkExtensions.pbobjc.h"
21
22 #import "FIRMessaging.h"
23 #import "FIRMessagingDataMessageManager.h"
24 #import "FIRMessagingDefines.h"
25 #import "FIRMessagingLogger.h"
26 #import "FIRMessagingRmqManager.h"
27 #import "FIRMessagingSecureSocket.h"
28 #import "FIRMessagingUtilities.h"
29 #import "FIRMessagingVersionUtilities.h"
30 #import "FIRMessaging_Private.h"
31
32 static NSInteger const kIqSelectiveAck = 12;
33 static NSInteger const kIqStreamAck = 13;
34 static int const kInvalidStreamId = -1;
35 // Threshold for number of messages removed that we will ack, for short lived connections
36 static int const kMessageRemoveAckThresholdCount = 5;
37
38 static NSTimeInterval const kHeartbeatInterval = 30.0;
39 static NSTimeInterval const kConnectionTimeout = 20.0;
40 static int32_t const kAckingInterval = 10;
41
42 static NSString *const kUnackedS2dIdKey = @"FIRMessagingUnackedS2dIdKey";
43 static NSString *const kAckedS2dIdMapKey = @"FIRMessagingAckedS2dIdMapKey";
44
45 static NSString *const kRemoteFromAddress = @"from";
46
47 @interface FIRMessagingD2SInfo : NSObject
48
49 @property(nonatomic, readwrite, assign) int streamId;
50 @property(nonatomic, readwrite, strong) NSString *d2sID;
51 - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID;
52
53 @end
54
55 @implementation FIRMessagingD2SInfo
56
57 - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID {
58   self = [super init];
59   if (self) {
60     _streamId = streamId;
61     _d2sID = [d2sID copy];
62   }
63   return self;
64 }
65
66 - (BOOL)isEqual:(id)object {
67   if ([object isKindOfClass:[self class]]) {
68     FIRMessagingD2SInfo *other = (FIRMessagingD2SInfo *)object;
69     return self.streamId == other.streamId && [self.d2sID isEqualToString:other.d2sID];
70   }
71   return NO;
72 }
73
74 - (NSUInteger)hash {
75   return [self.d2sID hash];
76 }
77
78 @end
79
80 @interface FIRMessagingConnection ()<FIRMessagingSecureSocketDelegate>
81
82 @property(nonatomic, readwrite, weak) FIRMessagingRmqManager *rmq2Manager;
83 @property(nonatomic, readwrite, weak) FIRMessagingDataMessageManager *dataMessageManager;
84
85 @property(nonatomic, readwrite, assign) FIRMessagingConnectionState state;
86 @property(nonatomic, readwrite, copy) NSString *host;
87 @property(nonatomic, readwrite, assign) NSUInteger port;
88
89 @property(nonatomic, readwrite, strong) NSString *authId;
90 @property(nonatomic, readwrite, strong) NSString *token;
91
92 @property(nonatomic, readwrite, strong) FIRMessagingSecureSocket *socket;
93
94 @property(nonatomic, readwrite, assign) int64_t lastLoginServerTimestamp;
95 @property(nonatomic, readwrite, assign) int lastStreamIdAcked;
96 @property(nonatomic, readwrite, assign) int inStreamId;
97 @property(nonatomic, readwrite, assign) int outStreamId;
98
99 @property(nonatomic, readwrite, strong) NSMutableArray *unackedS2dIds;
100 @property(nonatomic, readwrite, strong) NSMutableDictionary *ackedS2dMap;
101 @property(nonatomic, readwrite, strong) NSMutableArray *d2sInfos;
102 // ttl=0 messages that need to be sent as soon as we establish a connection
103 @property(nonatomic, readwrite, strong) NSMutableArray *sendOnConnectMessages;
104
105 @property(nonatomic, readwrite, strong) NSRunLoop *runLoop;
106
107 @end
108
109
110 @implementation FIRMessagingConnection;
111
112 - (instancetype)initWithAuthID:(NSString *)authId
113                          token:(NSString *)token
114                           host:(NSString *)host
115                           port:(NSUInteger)port
116                        runLoop:(NSRunLoop *)runLoop
117                    rmq2Manager:(FIRMessagingRmqManager *)rmq2Manager
118                     fcmManager:(FIRMessagingDataMessageManager *)dataMessageManager {
119   self = [super init];
120   if (self) {
121     _authId = [authId copy];
122     _token = [token copy];
123     _host = [host copy];
124     _port = port;
125     _runLoop = runLoop;
126     _rmq2Manager = rmq2Manager;
127     _dataMessageManager = dataMessageManager;
128
129     _d2sInfos = [NSMutableArray array];
130
131     _unackedS2dIds = [NSMutableArray arrayWithArray:[_rmq2Manager unackedS2dRmqIds]];
132     _ackedS2dMap = [NSMutableDictionary dictionary];
133     _sendOnConnectMessages = [NSMutableArray array];
134   }
135   return self;
136 }
137
138 - (NSString *)description {
139   return [NSString stringWithFormat:@"host: %@, port: %lu, stream id in: %d, stream id out: %d",
140           self.host,
141           _FIRMessaging_UL(self.port),
142           self.inStreamId,
143           self.outStreamId];
144 }
145
146 - (void)signIn {
147   _FIRMessagingDevAssert(self.state == kFIRMessagingConnectionNotConnected, @"Invalid connection state.");
148   if (self.state != kFIRMessagingConnectionNotConnected) {
149     return;
150   }
151
152   // break it up for testing
153   [self setupConnectionSocket];
154   [self connectToSocket:self.socket];
155 }
156
157 - (void)setupConnectionSocket {
158   self.socket = [[FIRMessagingSecureSocket alloc] init];
159   self.socket.delegate = self;
160 }
161
162 - (void)connectToSocket:(FIRMessagingSecureSocket *)socket {
163   self.state = kFIRMessagingConnectionConnecting;
164   FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection000,
165                           @"Start connecting to FIRMessaging service.");
166   [socket connectToHost:self.host port:self.port onRunLoop:self.runLoop];
167 }
168
169 - (void)signOut {
170   // Clear the list of messages to be sent on connect. This will only
171   // have messages in it if an error happened before receiving the LoginResponse.
172   [self.sendOnConnectMessages removeAllObjects];
173
174   if (self.state == kFIRMessagingConnectionSignedIn) {
175     [self sendClose];
176   }
177   if (self.state != kFIRMessagingConnectionNotConnected) {
178     [self disconnect];
179   }
180 }
181
182 - (void)teardown {
183   if (self.state != kFIRMessagingConnectionNotConnected) {
184     [self disconnect];
185   }
186 }
187
188 #pragma mark - FIRMessagingSecureSocketDelegate
189
190 - (void)secureSocketDidConnect:(FIRMessagingSecureSocket *)socket {
191   self.state = kFIRMessagingConnectionConnected;
192   self.lastStreamIdAcked = 0;
193   self.inStreamId = 0;
194   self.outStreamId = 0;
195
196   FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection001,
197                           @"Connected to FIRMessaging service.");
198   [self resetUnconfirmedAcks];
199   [self sendLoginRequest:self.authId token:self.token];
200 }
201
202 - (void)didDisconnectWithSecureSocket:(FIRMessagingSecureSocket *)socket {
203   _FIRMessagingDevAssert(self.socket == socket, @"Invalid socket");
204   _FIRMessagingDevAssert(self.socket.state == kFIRMessagingSecureSocketClosed, @"Socket already closed");
205
206   FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection002,
207                           @"Secure socket disconnected from FIRMessaging service.");
208   [self disconnect];
209   [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonSocketDisconnected];
210 }
211
212 - (void)secureSocket:(FIRMessagingSecureSocket *)socket
213       didReceiveData:(NSData *)data
214              withTag:(int8_t)tag {
215   if (tag < 0) {
216     // Invalid proto tag
217     return;
218   }
219
220   Class klassForTag = FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag);
221   if ([klassForTag isSubclassOfClass:[NSNull class]]) {
222     FIRMessagingLoggerError(kFIRMessagingMessageCodeConnection003, @"Invalid tag %d for proto",
223                             tag);
224     return;
225   }
226
227   GPBMessage *proto = [klassForTag parseFromData:data error:NULL];
228   if (tag == kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionConnected) {
229     FIRMessagingLoggerDebug(
230         kFIRMessagingMessageCodeConnection004,
231         @"Should not receive generated message when the connection is not connected.");
232     return;
233   } else if (tag != kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionSignedIn) {
234     FIRMessagingLoggerDebug(
235         kFIRMessagingMessageCodeConnection005,
236         @"Should not receive generated message when the connection is not signed in.");
237     return;
238   }
239
240   // If traffic is received after a heartbeat it is safe to assume the connection is healthy.
241   [self cancelConnectionTimeoutTask];
242   [self performSelector:@selector(sendHeartbeatPing)
243              withObject:nil
244              afterDelay:kHeartbeatInterval];
245
246   [self willProcessProto:proto];
247   switch (tag) {
248     case kFIRMessagingProtoTagLoginResponse:
249       [self didReceiveLoginResponse:(GtalkLoginResponse *)proto];
250       break;
251     case kFIRMessagingProtoTagDataMessageStanza:
252       [self didReceiveDataMessageStanza:(GtalkDataMessageStanza *)proto];
253       break;
254     case kFIRMessagingProtoTagHeartbeatPing:
255       [self didReceiveHeartbeatPing:(GtalkHeartbeatPing *)proto];
256       break;
257     case kFIRMessagingProtoTagHeartbeatAck:
258       [self didReceiveHeartbeatAck:(GtalkHeartbeatAck *)proto];
259       break;
260     case kFIRMessagingProtoTagClose:
261       [self didReceiveClose:(GtalkClose *)proto];
262       break;
263     case kFIRMessagingProtoTagIqStanza:
264       [self handleIqStanza:(GtalkIqStanza *)proto];
265       break;
266     default:
267       [self didReceiveUnhandledProto:proto];
268       break;
269   }
270 }
271
272 // Called from secure socket once we have send the proto with given rmqId over the wire
273 // since we are mostly concerned with user facing messages which certainly have a rmqId
274 // we can retrieve them from the Rmq if necessary to look at stuff but for now we just
275 // log it.
276 - (void)secureSocket:(FIRMessagingSecureSocket *)socket
277  didSendProtoWithTag:(int8_t)tag
278                rmqId:(NSString *)rmqId {
279   // log the message
280   [self logMessage:rmqId messageType:tag isOut:YES];
281 }
282
283 #pragma mark - FIRMessagingTestConnection
284
285 - (void)sendProto:(GPBMessage *)proto {
286   FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(proto);
287   if (tag == kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionConnected) {
288     FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection006,
289                             @"Cannot send generated message when the connection is not connected.");
290     return;
291   } else if (tag != kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionSignedIn) {
292     FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection007,
293                             @"Cannot send generated message when the connection is not signed in.");
294     return;
295   }
296
297   _FIRMessagingDevAssert(self.socket != nil, @"Socket shouldn't be nil");
298   if (self.socket == nil) {
299     return;
300   }
301
302   [self willSendProto:proto];
303
304   [self.socket sendData:proto.data withTag:tag rmqId:FIRMessagingGetRmq2Id(proto)];
305 }
306
307 - (void)sendOnConnectOrDrop:(GPBMessage *)message {
308   if (self.state == kFIRMessagingConnectionSignedIn) {
309     // If a connection has already been established, send normally
310     [self sendProto:message];
311   } else {
312     // Otherwise add them to the list of messages to send after login
313     [self.sendOnConnectMessages addObject:message];
314   }
315 }
316
317 + (GtalkLoginRequest *)loginRequestWithToken:(NSString *)token authID:(NSString *)authID {
318   GtalkLoginRequest *login = [[GtalkLoginRequest alloc] init];
319   login.accountId = 1000000;
320   login.authService = GtalkLoginRequest_AuthService_AndroidId;
321   login.authToken = token;
322   login.id_p = [NSString stringWithFormat:@"%@-%@", @"ios", FIRMessagingCurrentLibraryVersion()];
323   login.domain = @"mcs.android.com";
324   login.deviceId = [NSString stringWithFormat:@"android-%llx", authID.longLongValue];
325   login.networkType = [self currentNetworkType];
326   login.resource = authID;
327   login.user = authID;
328   login.useRmq2 = YES;
329   login.lastRmqId = 1; // Sending not enabled yet so this stays as 1.
330   return login;
331 }
332
333 + (int32_t)currentNetworkType {
334   // http://developer.android.com/reference/android/net/ConnectivityManager.html
335   int32_t fcmNetworkType;
336   FIRMessagingNetworkStatus type = [[FIRMessaging messaging] networkType];
337   switch (type) {
338     case kFIRMessagingReachabilityReachableViaWiFi:
339       fcmNetworkType = 1;
340       break;
341
342     case kFIRMessagingReachabilityReachableViaWWAN:
343       fcmNetworkType = 0;
344       break;
345
346     default:
347       fcmNetworkType = -1;
348       break;
349   }
350   return fcmNetworkType;
351 }
352
353 - (void)sendLoginRequest:(NSString *)authId
354                    token:(NSString *)token {
355   GtalkLoginRequest *login = [[self class] loginRequestWithToken:token authID:authId];
356
357   // clear the messages sent during last connection
358   if ([self.d2sInfos count]) {
359     [self.d2sInfos removeAllObjects];
360   }
361
362   if (self.unackedS2dIds.count > 0) {
363     FIRMessagingLoggerDebug(
364         kFIRMessagingMessageCodeConnection008,
365         @"There are unacked persistent Ids in the login request: %@",
366         [self.unackedS2dIds.description stringByReplacingOccurrencesOfString:@"%"
367                                                                   withString:@"%%"]);
368   }
369   // Send out acks.
370   for (NSString *unackedPersistentS2dId in self.unackedS2dIds) {
371     [login.receivedPersistentIdArray addObject:unackedPersistentS2dId];
372   }
373
374   GtalkSetting *setting = [[GtalkSetting alloc] init];
375   setting.name = @"new_vc";
376   setting.value = @"1";
377   [login.settingArray addObject:setting];
378
379   [self sendProto:login];
380 }
381
382 - (void)sendHeartbeatAck {
383   [self sendProto:[[GtalkHeartbeatAck alloc] init]];
384 }
385
386 - (void)sendHeartbeatPing {
387   // cancel the previous heartbeat request.
388   [NSObject cancelPreviousPerformRequestsWithTarget:self
389                                            selector:@selector(sendHeartbeatPing)
390                                              object:nil];
391   [self scheduleConnectionTimeoutTask];
392   [self sendProto:[[GtalkHeartbeatPing alloc] init]];
393 }
394
395 + (GtalkIqStanza *)createStreamAck {
396   GtalkIqStanza *iq = [[GtalkIqStanza alloc] init];
397   iq.type = GtalkIqStanza_IqType_Set;
398   iq.id_p = @"";
399   GtalkExtension *ext = [[GtalkExtension alloc] init];
400   ext.id_p = kIqStreamAck;
401   ext.data_p = @"";
402   iq.extension = ext;
403   return iq;
404 }
405
406 - (void)sendStreamAck {
407   GtalkIqStanza *iq = [[self class] createStreamAck];
408   [self sendProto:iq];
409 }
410
411 - (void)sendClose {
412   [self sendProto:[[GtalkClose alloc] init]];
413 }
414
415 - (void)handleIqStanza:(GtalkIqStanza *)iq {
416   if (iq.hasExtension) {
417     if (iq.extension.id_p == kIqStreamAck) {
418       [self didReceiveStreamAck:iq];
419       return;
420     }
421     if (iq.extension.id_p == kIqSelectiveAck) {
422       [self didReceiveSelectiveAck:iq];
423       return;
424     }
425     FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection009, @"Unknown ack extension id %d.",
426                             iq.extension.id_p);
427   } else {
428     FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection010, @"Ip stanza without extension.");
429   }
430   [self didReceiveUnhandledProto:iq];
431 }
432
433 - (void)didReceiveLoginResponse:(GtalkLoginResponse *)loginResponse {
434   if (loginResponse.hasError) {
435     FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection011,
436                             @"Login error with type: %@, message: %@.", loginResponse.error.type,
437                             loginResponse.error.message);
438     return;
439   }
440   FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection012, @"Logged onto MCS service.");
441   // We sent the persisted list of unack'd messages with login so we can assume they have been ack'd
442   // by the server.
443   _FIRMessagingDevAssert(self.unackedS2dIds.count == 0, @"No ids present");
444   _FIRMessagingDevAssert(self.outStreamId == 1, @"Login should be the first stream id");
445
446   self.state = kFIRMessagingConnectionSignedIn;
447   self.lastLoginServerTimestamp = loginResponse.serverTimestamp;
448   [self.delegate didLoginWithConnection:self];
449   [self sendHeartbeatPing];
450
451   // Add all the TTL=0 messages on connect
452   for (GPBMessage *message in self.sendOnConnectMessages) {
453     [self sendProto:message];
454   }
455   [self.sendOnConnectMessages removeAllObjects];
456 }
457
458 - (void)didReceiveHeartbeatPing:(GtalkHeartbeatPing *)heartbeatPing {
459   [self sendHeartbeatAck];
460 }
461
462 - (void)didReceiveHeartbeatAck:(GtalkHeartbeatAck *)heartbeatAck {
463 }
464
465 - (void)didReceiveDataMessageStanza:(GtalkDataMessageStanza *)dataMessageStanza {
466   // TODO: Maybe add support raw data later
467   [self.delegate connectionDidRecieveMessage:dataMessageStanza];
468 }
469
470 - (void)didReceiveUnhandledProto:(GPBMessage *)proto {
471   FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection013, @"Received unhandled proto");
472 }
473
474 - (void)didReceiveStreamAck:(GtalkIqStanza *)iq {
475   // Server received some stuff from us we don't really need to do anything special
476 }
477
478 - (void)didReceiveSelectiveAck:(GtalkIqStanza *)iq {
479   GtalkExtension *extension = iq.extension;
480   if (extension) {
481     int extensionId = extension.id_p;
482     if (extensionId == kIqSelectiveAck) {
483
484       NSString *dataString = extension.data_p;
485       GtalkSelectiveAck *selectiveAck = [[GtalkSelectiveAck alloc] init];
486       [selectiveAck mergeFromData:[dataString dataUsingEncoding:NSUTF8StringEncoding]
487                 extensionRegistry:nil];
488
489       NSArray <NSString *>*acks = [selectiveAck idArray];
490
491       // we've received ACK's
492       [self.delegate connectionDidReceiveAckForRmqIds:acks];
493
494       // resend unacked messages
495       [self.dataMessageManager resendMessagesWithConnection:self];
496     }
497   }
498 }
499
500 - (void)didReceiveClose:(GtalkClose *)close {
501   [self disconnect];
502 }
503
504 - (void)willProcessProto:(GPBMessage *)proto {
505   self.inStreamId++;
506
507   if ([proto isKindOfClass:GtalkDataMessageStanza.class]) {
508     FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection014,
509                             @"RMQ: Receiving %@ with rmq_id: %@ incoming stream Id: %d",
510                             proto.class, FIRMessagingGetRmq2Id(proto), self.inStreamId);
511   } else {
512     FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection015,
513                             @"RMQ: Receiving %@ with incoming stream Id: %d.", proto.class,
514                             self.inStreamId);
515   }
516   int streamId = FIRMessagingGetLastStreamId(proto);
517   if (streamId != kInvalidStreamId) {
518     // confirm the D2S messages that were sent by us
519     [self confirmAckedD2sIdsWithStreamId:streamId];
520
521     // We can now confirm that our ack was received by the server and start our unack'd list fresh
522     // with the proto we just received.
523     [self confirmAckedS2dIdsWithStreamId:streamId];
524   }
525   NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
526   if (rmq2Id != nil) {
527     FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection016,
528                             @"RMQ: Add unacked persistent Id: %@.",
529                             [rmq2Id stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
530     [self.unackedS2dIds addObject:rmq2Id];
531     [self.rmq2Manager saveS2dMessageWithRmqId:rmq2Id]; // RMQ save
532   }
533   BOOL explicitAck = ([proto isKindOfClass:[GtalkDataMessageStanza class]] &&
534                       [(GtalkDataMessageStanza *)proto immediateAck]);
535   // If we have not sent anything and the ack threshold has been reached then explicitly send one
536   // to notify the server that we have received messages.
537   if (self.inStreamId - self.lastStreamIdAcked >= kAckingInterval || explicitAck) {
538     [self sendStreamAck];
539   }
540 }
541
542 - (void)willSendProto:(GPBMessage *)proto {
543   self.outStreamId++;
544
545   NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
546   if ([rmq2Id length]) {
547     FIRMessagingD2SInfo *d2sInfo = [[FIRMessagingD2SInfo alloc] initWithStreamId:self.outStreamId d2sId:rmq2Id];
548     [self.d2sInfos addObject:d2sInfo];
549   }
550
551   // each time we send a d2s message, it acks previously received
552   // s2d messages via the last (s2d) stream id received.
553
554   FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection017,
555                           @"RMQ: Sending %@ with outgoing stream Id: %d.", proto.class,
556                           self.outStreamId);
557   // We have received messages since last time we sent something - send ack info to server.
558   if (self.inStreamId > self.lastStreamIdAcked) {
559     FIRMessagingSetLastStreamId(proto, self.inStreamId);
560     self.lastStreamIdAcked = self.inStreamId;
561   }
562
563   if (self.unackedS2dIds.count > 0) {
564     // Move all 'unack'd' messages to the ack'd map so they can be removed once the
565     // ack is confirmed.
566     NSArray *ackedS2dIds = [NSArray arrayWithArray:self.unackedS2dIds];
567     FIRMessagingLoggerDebug(
568         kFIRMessagingMessageCodeConnection018, @"RMQ: Mark persistent Ids as acked: %@.",
569         [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
570     [self.unackedS2dIds removeAllObjects];
571     self.ackedS2dMap[[@(self.outStreamId) stringValue]] = ackedS2dIds;
572   }
573 }
574
575 #pragma mark - Private
576
577 /**
578  * This processes the s2d message received in reference to the d2s messages
579  * that we have sent before.
580  */
581 - (void)confirmAckedD2sIdsWithStreamId:(int)lastReceivedStreamId {
582   NSMutableArray *d2sIdsAcked = [NSMutableArray array];
583   for (FIRMessagingD2SInfo *d2sInfo in self.d2sInfos) {
584     if (lastReceivedStreamId < d2sInfo.streamId) {
585       break;
586     }
587     [d2sIdsAcked addObject:d2sInfo];
588   }
589
590   NSMutableArray *rmqIds = [NSMutableArray arrayWithCapacity:[d2sIdsAcked count]];
591   // remove ACK'ed messages
592   for (FIRMessagingD2SInfo *d2sInfo in d2sIdsAcked) {
593     if ([d2sInfo.d2sID length]) {
594       [rmqIds addObject:d2sInfo.d2sID];
595     }
596     [self.d2sInfos removeObject:d2sInfo];
597   }
598   [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
599   int count = [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
600   if (kMessageRemoveAckThresholdCount > 0 && count >= kMessageRemoveAckThresholdCount) {
601     // For short lived connections, if a large number of messages are removed, send an
602     // ack straight away so the server knows that this message was received.
603     [self sendStreamAck];
604   }
605 }
606
607 /**
608  * Called when a stream ACK or a selective ACK are received - this indicates the message has
609  * been received by MCS.
610  */
611 - (void)didReceiveAckForRmqIds:(NSArray *)rmqIds {
612   // TODO: let the user know that the following messages were received by the server
613 }
614
615 - (void)confirmAckedS2dIdsWithStreamId:(int)lastReceivedStreamId {
616   // If the server hasn't received the streamId yet.
617   FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection019,
618                           @"RMQ: Server last received stream Id: %d.", lastReceivedStreamId);
619   if (lastReceivedStreamId < self.outStreamId) {
620     // TODO: This could be a good indicator that we need to re-send something (acks)?
621     FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection020,
622                             @"RMQ: There are unsent messages that should be send...\n"
623                              "server received: %d\nlast stream id sent: %d",
624                             lastReceivedStreamId, self.outStreamId);
625   }
626
627   NSSet *ackedStreamIds =
628     [self.ackedS2dMap keysOfEntriesPassingTest:^BOOL(id key, id obj, BOOL *stop) {
629       NSString *streamId = key;
630       return streamId.intValue <= lastReceivedStreamId;
631     }];
632   NSMutableArray *s2dIdsToDelete = [NSMutableArray array];
633
634   for (NSString *streamId in ackedStreamIds) {
635     NSArray *ackedS2dIds = self.ackedS2dMap[streamId];
636     if (ackedS2dIds.count > 0) {
637       FIRMessagingLoggerDebug(
638           kFIRMessagingMessageCodeConnection021,
639           @"RMQ: Mark persistent Ids as confirmed by stream id %@: %@.", streamId,
640           [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
641       [self.ackedS2dMap removeObjectForKey:streamId];
642     }
643
644     [s2dIdsToDelete addObjectsFromArray:ackedS2dIds];
645   }
646
647   // clean up s2d ids that the server knows we've received.
648   // we let the server know via a s2d last stream id received in a
649   // d2s message. the server lets us know it has received our d2s
650   // message via a d2s last stream id received in a s2d message.
651   [self.rmq2Manager removeS2dIds:s2dIdsToDelete];
652 }
653
654 - (void)resetUnconfirmedAcks {
655   [self.ackedS2dMap enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
656     [self.unackedS2dIds addObjectsFromArray:obj];
657   }];
658   [self.ackedS2dMap removeAllObjects];
659 }
660
661 - (void)disconnect {
662   _FIRMessagingDevAssert(self.state != kFIRMessagingConnectionNotConnected, @"Connection already not connected");
663   // cancel pending timeout tasks.
664   [self cancelConnectionTimeoutTask];
665   // cancel pending heartbeat.
666   [NSObject cancelPreviousPerformRequestsWithTarget:self
667                                            selector:@selector(sendHeartbeatPing)
668                                              object:nil];
669   // Unset the delegate. FIRMessagingConnection will not receive further events from the socket from now on.
670   self.socket.delegate = nil;
671   [self.socket disconnect];
672   self.state = kFIRMessagingConnectionNotConnected;
673 }
674
675 - (void)connectionTimedOut {
676   FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection022,
677                           @"Connection to FIRMessaging service timed out.");
678   [self disconnect];
679   [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonTimeout];
680 }
681
682 - (void)scheduleConnectionTimeoutTask {
683   // cancel the previous heartbeat timeout event and schedule a new one.
684   [self cancelConnectionTimeoutTask];
685   [self performSelector:@selector(connectionTimedOut)
686              withObject:nil
687              afterDelay:[self connectionTimeoutInterval]];
688 }
689
690 - (void)cancelConnectionTimeoutTask {
691   // cancel pending timeout tasks.
692   [NSObject cancelPreviousPerformRequestsWithTarget:self
693                                            selector:@selector(connectionTimedOut)
694                                              object:nil];
695 }
696
697 - (void)logMessage:(NSString *)description messageType:(int)messageType isOut:(BOOL)isOut {
698   messageType = isOut ? -messageType : messageType;
699   FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection023,
700                           @"Send msg: %@ type: %d inStreamId: %d outStreamId: %d", description,
701                           messageType, self.inStreamId, self.outStreamId);
702 }
703
704 - (NSTimeInterval)connectionTimeoutInterval {
705   return kConnectionTimeout;
706 }
707
708 @end