2 * Copyright 2017 Google
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #import "FIRMessagingSecureSocket.h"
19 #import "GPBMessage.h"
20 #import "GPBCodedOutputStream.h"
21 #import "GPBUtilities.h"
23 #import "FIRMessagingCodedInputStream.h"
24 #import "FIRMessagingDefines.h"
25 #import "FIRMessagingLogger.h"
26 #import "FIRMessagingPacketQueue.h"
28 static const NSUInteger kMaxBufferLength = 1024 * 1024; // 1M
29 static const NSUInteger kBufferLengthIncrement = 16 * 1024; // 16k
30 static const uint8_t kVersion = 40;
31 static const uint8_t kInvalidTag = -1;
33 typedef NS_ENUM(NSUInteger, FIRMessagingSecureSocketReadResult) {
34 kFIRMessagingSecureSocketReadResultNone,
35 kFIRMessagingSecureSocketReadResultIncomplete,
36 kFIRMessagingSecureSocketReadResultCorrupt,
37 kFIRMessagingSecureSocketReadResultSuccess
40 static int32_t LogicalRightShift32(int32_t value, int32_t spaces) {
41 return (int32_t)((uint32_t)(value) >> spaces);
44 static NSUInteger SerializedSize(int32_t value) {
47 if ((value & ~0x7F) == 0) {
48 bytes += sizeof(uint8_t);
51 bytes += sizeof(uint8_t);
52 value = LogicalRightShift32(value, 7);
57 @interface FIRMessagingSecureSocket() <NSStreamDelegate>
59 @property(nonatomic, readwrite, assign) FIRMessagingSecureSocketState state;
60 @property(nonatomic, readwrite, strong) NSInputStream *inStream;
61 @property(nonatomic, readwrite, strong) NSOutputStream *outStream;
63 @property(nonatomic, readwrite, strong) NSMutableData *inputBuffer;
64 @property(nonatomic, readwrite, assign) NSUInteger inputBufferLength;
65 @property(nonatomic, readwrite, strong) NSMutableData *outputBuffer;
66 @property(nonatomic, readwrite, assign) NSUInteger outputBufferLength;
68 @property(nonatomic, readwrite, strong) FIRMessagingPacketQueue *packetQueue;
69 @property(nonatomic, readwrite, assign) BOOL isVersionSent;
70 @property(nonatomic, readwrite, assign) BOOL isVersionReceived;
71 @property(nonatomic, readwrite, assign) BOOL isInStreamOpen;
72 @property(nonatomic, readwrite, assign) BOOL isOutStreamOpen;
74 @property(nonatomic, readwrite, strong) NSRunLoop *runLoop;
75 @property(nonatomic, readwrite, strong) NSString *currentRmqIdBeingSent;
76 @property(nonatomic, readwrite, assign) int8_t currentProtoTypeBeingSent;
80 @implementation FIRMessagingSecureSocket
82 - (instancetype)init {
85 _state = kFIRMessagingSecureSocketNotOpen;
86 _inputBuffer = [NSMutableData dataWithLength:kBufferLengthIncrement];
87 _packetQueue = [[FIRMessagingPacketQueue alloc] init];
88 _currentProtoTypeBeingSent = kInvalidTag;
97 - (void)connectToHost:(NSString *)host
99 onRunLoop:(NSRunLoop *)runLoop {
100 _FIRMessagingDevAssert(host != nil, @"Invalid host");
101 _FIRMessagingDevAssert(runLoop != nil, @"Invalid runloop");
102 _FIRMessagingDevAssert(self.state == kFIRMessagingSecureSocketNotOpen, @"Socket is already connected");
104 if (!host || self.state != kFIRMessagingSecureSocketNotOpen) {
108 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket000,
109 @"Opening secure socket to FIRMessaging service");
110 self.state = kFIRMessagingSecureSocketOpening;
111 self.runLoop = runLoop;
112 CFReadStreamRef inputStreamRef;
113 CFWriteStreamRef outputStreamRef;
114 CFStreamCreatePairWithSocketToHost(NULL,
115 (__bridge CFStringRef)host,
119 self.inStream = CFBridgingRelease(inputStreamRef);
120 self.outStream = CFBridgingRelease(outputStreamRef);
121 if (!self.inStream || !self.outStream) {
122 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket001,
123 @"Failed to initialize socket.");
127 self.isInStreamOpen = NO;
128 self.isOutStreamOpen = NO;
130 BOOL isVOIPSocket = NO;
132 [self openStream:self.outStream isVOIPStream:isVOIPSocket];
133 [self openStream:self.inStream isVOIPStream:isVOIPSocket];
137 if (self.state == kFIRMessagingSecureSocketClosing) {
140 if (!self.inStream && !self.outStream) {
141 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket002,
142 @"The socket is not open or already closed.");
143 _FIRMessagingDevAssert(self.state == kFIRMessagingSecureSocketClosed || self.state == kFIRMessagingSecureSocketNotOpen,
144 @"Socket is already disconnected.");
148 self.state = kFIRMessagingSecureSocketClosing;
150 [self closeStream:self.inStream];
153 if (self.outStream) {
154 [self closeStream:self.outStream];
155 self.outStream = nil;
157 self.state = kFIRMessagingSecureSocketClosed;
158 [self.delegate didDisconnectWithSecureSocket:self];
161 - (void)sendData:(NSData *)data withTag:(int8_t)tag rmqId:(NSString *)rmqId {
162 [self.packetQueue push:[FIRMessagingPacket packetWithTag:tag rmqId:rmqId data:data]];
163 if ([self.outStream hasSpaceAvailable]) {
168 #pragma mark - NSStreamDelegate
170 - (void)stream:(NSStream *)stream handleEvent:(NSStreamEvent)eventCode {
172 case NSStreamEventHasBytesAvailable:
173 if (self.state != kFIRMessagingSecureSocketOpen) {
174 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket003,
175 @"Try to read from socket that is not opened");
178 _FIRMessagingDevAssert(stream == self.inStream, @"Incorrect stream");
179 if (![self performRead]) {
180 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket004,
181 @"Error occured when reading incoming stream");
185 case NSStreamEventEndEncountered:
186 FIRMessagingLoggerDebug(
187 kFIRMessagingMessageCodeSecureSocket005, @"%@ end encountered",
188 stream == self.inStream
190 : (stream == self.outStream ? @"Output stream" : @"Unknown stream"));
193 case NSStreamEventOpenCompleted:
194 if (stream == self.inStream) {
195 self.isInStreamOpen = YES;
196 } else if (stream == self.outStream) {
197 self.isOutStreamOpen = YES;
199 if (self.isInStreamOpen && self.isOutStreamOpen) {
200 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket006,
201 @"Secure socket to FIRMessaging service opened");
202 self.state = kFIRMessagingSecureSocketOpen;
203 [self.delegate secureSocketDidConnect:self];
206 case NSStreamEventErrorOccurred: {
207 FIRMessagingLoggerDebug(
208 kFIRMessagingMessageCodeSecureSocket007, @"%@ error occurred",
209 stream == self.inStream
211 : (stream == self.outStream ? @"Output stream" : @"Unknown stream"));
215 case NSStreamEventHasSpaceAvailable:
216 if (self.state != kFIRMessagingSecureSocketOpen) {
217 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket008,
218 @"Try to write to socket that is not opened");
221 _FIRMessagingDevAssert(stream == self.outStream, @"Incorrect stream");
229 #pragma mark - Private
231 - (void)openStream:(NSStream *)stream isVOIPStream:(BOOL)isVOIPStream {
232 _FIRMessagingDevAssert(stream != nil, @"Invalid stream");
233 _FIRMessagingDevAssert(self.runLoop != nil, @"Invalid runloop");
236 _FIRMessagingDevAssert([stream streamStatus] == NSStreamStatusNotOpen, @"Stream already open");
237 if ([stream streamStatus] != NSStreamStatusNotOpen) {
238 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket009,
239 @"stream should not be open.");
242 [stream setProperty:NSStreamSocketSecurityLevelNegotiatedSSL
243 forKey:NSStreamSocketSecurityLevelKey];
245 [stream setProperty:NSStreamNetworkServiceTypeVoIP
246 forKey:NSStreamNetworkServiceType];
248 stream.delegate = self;
249 [stream scheduleInRunLoop:self.runLoop forMode:NSDefaultRunLoopMode];
254 - (void)closeStream:(NSStream *)stream {
255 _FIRMessagingDevAssert(stream != nil, @"Invalid stream");
256 _FIRMessagingDevAssert(self.runLoop != nil, @"Invalid runloop");
260 [stream removeFromRunLoop:self.runLoop forMode:NSDefaultRunLoopMode];
261 stream.delegate = nil;
265 - (BOOL)performRead {
266 _FIRMessagingDevAssert(self.state == kFIRMessagingSecureSocketOpen, @"Socket should be open");
268 if (!self.isVersionReceived) {
269 self.isVersionReceived = YES;
270 uint8_t versionByte = 0;
271 NSInteger bytesRead = [self.inStream read:&versionByte maxLength:sizeof(uint8_t)];
272 if (bytesRead != sizeof(uint8_t) || kVersion != versionByte) {
273 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket010,
274 @"Version do not match. Received %d, Expecting %d", versionByte,
281 BOOL isInputBufferValid = [self.inputBuffer length] > 0;
282 _FIRMessagingDevAssert(isInputBufferValid,
283 @"Invalid input buffer size %lu. Used bytes length %lu, buffer content: %@",
284 _FIRMessaging_UL([self.inputBuffer length]),
285 _FIRMessaging_UL(self.inputBufferLength),
287 if (!isInputBufferValid) {
288 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket011,
289 @"Input buffer is not valid.");
293 if (![self.inStream hasBytesAvailable]) {
297 // try to read more data
298 uint8_t *unusedBufferPtr = (uint8_t *)self.inputBuffer.mutableBytes + self.inputBufferLength;
299 NSUInteger unusedBufferLength = [self.inputBuffer length] - self.inputBufferLength;
300 NSInteger bytesRead = [self.inStream read:unusedBufferPtr maxLength:unusedBufferLength];
301 if (bytesRead <= 0) {
302 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket012,
303 @"Failed to read input stream. Bytes read %ld, Used buffer size %lu, "
304 @"Unused buffer size %lu",
305 _FIRMessaging_UL(bytesRead), _FIRMessaging_UL(self.inputBufferLength),
306 _FIRMessaging_UL(unusedBufferLength));
309 // did successfully read some more data
310 self.inputBufferLength += (NSUInteger)bytesRead;
312 if ([self.inputBuffer length] <= self.inputBufferLength) {
313 // shouldn't be reading more than 1MB of data in one go
314 if ([self.inputBuffer length] + kBufferLengthIncrement > kMaxBufferLength) {
315 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket013,
316 @"Input buffer exceed 1M, disconnect socket");
319 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket014,
320 @"Input buffer limit exceeded. Used input buffer size %lu, "
321 @"Total input buffer size %lu. No unused buffer left. "
322 @"Increase buffer size.",
323 _FIRMessaging_UL(self.inputBufferLength),
324 _FIRMessaging_UL([self.inputBuffer length]));
325 [self.inputBuffer increaseLengthBy:kBufferLengthIncrement];
326 _FIRMessagingDevAssert([self.inputBuffer length] > self.inputBufferLength, @"Invalid buffer size");
329 while (self.inputBufferLength > 0 && [self.inputBuffer length] > 0) {
330 _FIRMessagingDevAssert([self.inputBuffer length] >= self.inputBufferLength,
331 @"Buffer longer than length");
332 NSRange inputRange = NSMakeRange(0, self.inputBufferLength);
333 size_t protoBytes = 0;
334 // read the actual proto data coming in
335 FIRMessagingSecureSocketReadResult readResult =
336 [self processCurrentInputBuffer:[self.inputBuffer subdataWithRange:inputRange]
337 outOffset:&protoBytes];
338 // Corrupt data encountered, stop processing.
339 if (readResult == kFIRMessagingSecureSocketReadResultCorrupt) {
341 // Incomplete data, keep trying to read by loading more from the stream.
342 } else if (readResult == kFIRMessagingSecureSocketReadResultIncomplete) {
345 _FIRMessagingDevAssert(self.inputBufferLength >= protoBytes, @"More bytes than buffer can handle");
346 // we have read (0, protoBytes) of data in the inputBuffer
347 if (protoBytes == self.inputBufferLength) {
348 // did completely read the buffer data can be reset for further processing
349 self.inputBufferLength = 0;
351 // delete processed bytes while maintaining the buffer size.
352 NSUInteger prevLength __unused = [self.inputBuffer length];
353 // delete the processed bytes
354 [self.inputBuffer replaceBytesInRange:NSMakeRange(0, protoBytes) withBytes:NULL length:0];
355 // reallocate more data
356 [self.inputBuffer increaseLengthBy:protoBytes];
357 _FIRMessagingDevAssert([self.inputBuffer length] == prevLength,
358 @"Invalid input buffer size %lu. Used bytes length %lu, "
359 @"buffer content: %@",
360 _FIRMessaging_UL([self.inputBuffer length]),
361 _FIRMessaging_UL(self.inputBufferLength),
363 self.inputBufferLength -= protoBytes;
370 - (FIRMessagingSecureSocketReadResult)processCurrentInputBuffer:(NSData *)readData
371 outOffset:(size_t *)outOffset {
374 FIRMessagingCodedInputStream *input = [[FIRMessagingCodedInputStream alloc] initWithData:readData];
376 if (![input readTag:&rawTag]) {
377 return kFIRMessagingSecureSocketReadResultIncomplete;
380 if (![input readLength:&length]) {
381 return kFIRMessagingSecureSocketReadResultIncomplete;
383 // NOTE tag can be zero for |HeartbeatPing|, and length can be zero for |Close| proto
384 _FIRMessagingDevAssert(rawTag >= 0 && length >= 0, @"Invalid tag or length");
385 if (rawTag < 0 || length < 0) {
386 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket015, @"Buffer data corrupted.");
387 return kFIRMessagingSecureSocketReadResultCorrupt;
389 NSData *data = [input readDataWithLength:(uint32_t)length];
391 FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket016,
392 @"Incomplete data, buffered data length %ld, expected length %d",
393 _FIRMessaging_UL(self.inputBufferLength), length);
394 return kFIRMessagingSecureSocketReadResultIncomplete;
396 [self.delegate secureSocket:self didReceiveData:data withTag:rawTag];
397 *outOffset = input.offset;
398 return kFIRMessagingSecureSocketReadResultSuccess;
401 - (void)performWrite {
402 _FIRMessagingDevAssert(self.state == kFIRMessagingSecureSocketOpen, @"Invalid socket state");
404 if (!self.isVersionSent) {
405 self.isVersionSent = YES;
406 uint8_t versionByte = kVersion;
407 [self.outStream write:&versionByte maxLength:sizeof(uint8_t)];
410 while (!self.packetQueue.isEmpty && self.outStream.hasSpaceAvailable) {
411 if (self.outputBuffer.length == 0) {
412 // serialize new packets only when the output buffer is flushed.
413 FIRMessagingPacket *packet = [self.packetQueue pop];
414 self.currentRmqIdBeingSent = packet.rmqId;
415 self.currentProtoTypeBeingSent = packet.tag;
416 NSUInteger length = SerializedSize(packet.tag) +
417 SerializedSize((int)packet.data.length) + packet.data.length;
418 self.outputBuffer = [NSMutableData dataWithLength:length];
419 GPBCodedOutputStream *output = [GPBCodedOutputStream streamWithData:self.outputBuffer];
420 [output writeRawVarint32:packet.tag];
421 [output writeBytesNoTag:packet.data];
422 self.outputBufferLength = 0;
425 // flush the output buffer.
426 NSInteger written = [self.outStream write:self.outputBuffer.bytes + self.outputBufferLength
427 maxLength:self.outputBuffer.length - self.outputBufferLength];
431 self.outputBufferLength += (NSUInteger)written;
432 if (self.outputBufferLength >= self.outputBuffer.length) {
433 self.outputBufferLength = 0;
434 self.outputBuffer = nil;
435 [self.delegate secureSocket:self
436 didSendProtoWithTag:self.currentProtoTypeBeingSent
437 rmqId:self.currentRmqIdBeingSent];
438 self.currentRmqIdBeingSent = nil;
439 self.currentProtoTypeBeingSent = kInvalidTag;