// // Copyright Amazon.com Inc. or its affiliates. // All Rights Reserved. // // SPDX-License-Identifier: Apache-2.0 // import XCTest @testable import AppSyncRealTimeClient class AppSyncRealTimeClientIntegrationTests: AppSyncRealTimeClientTestBase { /// Simple integration test against an AppSync service provisioned with a simple /// Todo model generated by the GraphQL Transform on the `model` directive. /// /// - Given: A subscription connection on an AppSync endpoint with Todo model provisioned /// - When: /// - Subscribe to the `onCreateTodo` /// - Then: /// - Webosocket connection and subscription connection is established. /// func testSubscribeWithSubscriptionConnection() { let subscribeSuccess = expectation(description: "subscribe successfully") let authInterceptor = APIKeyAuthInterceptor(apiKey) let connectionProvider = ConnectionProviderFactory.createConnectionProvider( for: urlRequest, authInterceptor: authInterceptor, connectionType: .appSyncRealtime ) let subscriptionConnection = AppSyncSubscriptionConnection(provider: connectionProvider) _ = subscriptionConnection.subscribe( requestString: requestString, variables: nil ) { event, _ in switch event { case .connection(let subscriptionConnectionEvent): switch subscriptionConnectionEvent { case .connecting: break case .connected: subscribeSuccess.fulfill() case .disconnected: break } case .data(let data): print("Got data back \(data)") case .failed(let error): XCTFail("Got error \(error)") } } wait(for: [subscribeSuccess], timeout: TestCommonConstants.networkTimeout) } /// The purpose of this test is to ensure that all websockets can be successfully /// created, exercised and terminated while keeping a single connection provider in /// memory. /// /// Specifically, the following test exercises the following: /// 1. Create a new connection provider /// 2. Create multiple subscriptions /// 3. Unsubscribe the subscriptions /// 4. Ensure the socket is disconnected /// 5. Repeat Steps 2-4 with the existing connection provider. /// /// - Given: Connected subscriptions /// - When: /// - All subscription items are unsubscribed /// - Then: /// - Underlying websocket is disconnected func testAllSubscriptionsCancelledShouldDisconnectTheWebsocket2() { let connectedInvoked = expectation(description: "Connection established") connectedInvoked.expectedFulfillmentCount = 3 let authInterceptor = APIKeyAuthInterceptor(apiKey) let connectionProvider = ConnectionProviderFactory.createConnectionProvider( for: urlRequest, authInterceptor: authInterceptor, connectionType: .appSyncRealtime ) let subscriptionConnection1 = AppSyncSubscriptionConnection(provider: connectionProvider) let item1 = subscriptionConnection1.subscribe( requestString: requestString, variables: nil ) { event, _ in if case let .connection(state) = event { if case .connected = state { connectedInvoked.fulfill() } } } let subscriptionConnection2 = AppSyncSubscriptionConnection(provider: connectionProvider) let item2 = subscriptionConnection2.subscribe( requestString: requestString, variables: nil ) { event, _ in if case let .connection(state) = event { if case .connected = state { connectedInvoked.fulfill() } } } let subscriptionConnection3 = AppSyncSubscriptionConnection(provider: connectionProvider) let item3 = subscriptionConnection3.subscribe( requestString: requestString, variables: nil ) { event, _ in if case let .connection(state) = event { if case .connected = state { connectedInvoked.fulfill() } } } XCTAssertNotNil(item1) XCTAssertNotNil(item2) XCTAssertNotNil(item3) wait(for: [connectedInvoked], timeout: TestCommonConstants.networkTimeout) guard let realTimeConnectionProvider = connectionProvider as? RealtimeConnectionProvider else { XCTFail("Could not retrieve concrete connection provider") return } assertStatus(of: realTimeConnectionProvider, equals: .connected) subscriptionConnection1.unsubscribe(item: item1) assertStatus(of: realTimeConnectionProvider, equals: .connected) subscriptionConnection2.unsubscribe(item: item2) assertStatus(of: realTimeConnectionProvider, equals: .connected) subscriptionConnection3.unsubscribe(item: item3) assertStatus(of: realTimeConnectionProvider, equals: .notConnected) let newConnectedInvoked = expectation(description: "Connection established") let subscriptionConnection4 = AppSyncSubscriptionConnection(provider: connectionProvider) let newItem = subscriptionConnection4.subscribe( requestString: requestString, variables: nil ) { event, _ in if case let .connection(state) = event { if case .connected = state { newConnectedInvoked.fulfill() } } } wait(for: [newConnectedInvoked], timeout: TestCommonConstants.networkTimeout) assertStatus(of: realTimeConnectionProvider, equals: .connected) subscriptionConnection4.unsubscribe(item: newItem) sleep(5) assertStatus(of: realTimeConnectionProvider, equals: .notConnected) } /// The purpose of this test is to ensure that a signifcant number of subscriptions /// can be created on a websocket, then unsubscribed, and repeated. /// /// Specifically, the following test exercises the following: /// 1. Create a new connection provider /// 2. Create multiple subscriptions /// 3. Unsubscribe the subscriptions /// 4. Ensure the socket is disconnected /// 5. Repeat Steps 2-4 with the existing connection provider. /// /// - Given: Connected subscriptions /// - When: /// - All subscription items are unsubscribed /// - Then: /// - Underlying websocket is disconnected func testSubscribeUnsubscribeRepeat() { let authInterceptor = APIKeyAuthInterceptor(apiKey) let connectionProvider = ConnectionProviderFactory.createConnectionProvider( for: urlRequest, authInterceptor: authInterceptor, connectionType: .appSyncRealtime ) guard let realTimeConnectionProvider = connectionProvider as? RealtimeConnectionProvider else { XCTFail("Could not retrieve concrete connection provider") return } let count = 30 let subscriptions = subscribe(connectionProvider, count: count) assertStatus(of: realTimeConnectionProvider, equals: .connected) for index in 0 ..< count { subscriptions[index].1.unsubscribe(item: subscriptions[index].0) } assertStatus(of: realTimeConnectionProvider, equals: .notConnected) let subscriptions2 = subscribe(connectionProvider, count: count) assertStatus(of: realTimeConnectionProvider, equals: .connected) for index in 0 ..< count { subscriptions2[index].1.unsubscribe(item: subscriptions2[index].0) } assertStatus(of: realTimeConnectionProvider, equals: .notConnected) } func testMultipleThreadsSubscribeUnsubscribe() { let authInterceptor = APIKeyAuthInterceptor(apiKey) let connectionProvider = ConnectionProviderFactory.createConnectionProvider( for: urlRequest, authInterceptor: authInterceptor, connectionType: .appSyncRealtime ) guard let realTimeConnectionProvider = connectionProvider as? RealtimeConnectionProvider else { XCTFail("Could not retrieve concrete connection provider") return } let expectedPerforms = expectation(description: "total performs") expectedPerforms.expectedFulfillmentCount = 1_000 DispatchQueue.concurrentPerform(iterations: 1_000) { _ in let subscriptionConnection = AppSyncSubscriptionConnection(provider: connectionProvider) let item = subscriptionConnection.subscribe( requestString: requestString, variables: nil ) { _, _ in } subscriptionConnection.unsubscribe(item: item) expectedPerforms.fulfill() } wait(for: [expectedPerforms], timeout: 1) assertStatus(of: realTimeConnectionProvider, equals: .notConnected) } // MARK: - Helpers private func subscribe( _ connectionProvider: ConnectionProvider, count: Int ) -> [(SubscriptionItem, AppSyncSubscriptionConnection)] { let connectedInvoked = expectation(description: "Connection established") connectedInvoked.expectedFulfillmentCount = count var subscriptions = [(SubscriptionItem, AppSyncSubscriptionConnection)]() for _ in 1 ... count { let subscriptionConnection = AppSyncSubscriptionConnection(provider: connectionProvider) let item = subscriptionConnection.subscribe( requestString: requestString, variables: nil ) { event, _ in if case let .connection(state) = event { if case .connected = state { connectedInvoked.fulfill() } } } subscriptions.append((item, subscriptionConnection)) } wait(for: [connectedInvoked], timeout: TestCommonConstants.networkTimeout) return subscriptions } /// Checks the status of the provider in a thread-safe way. This is only needed for tests; real-world /// call sites wouldn't be able to access the `status` as it has `internal` access. private func assertStatus( of provider: RealtimeConnectionProvider, equals status: ConnectionState ) { provider.connectionQueue.sync { XCTAssertEqual(provider.status, status) } } }