#include "Include.h" namespace Canary { Peer::Peer() : pAwsCredentialProvider(nullptr), terminated(FALSE), iceGatheringDone(FALSE), receivedOffer(FALSE), receivedAnswer(FALSE), foundPeerId(FALSE), pPeerConnection(nullptr), status(STATUS_SUCCESS) { } Peer::~Peer() { CHK_LOG_ERR(freePeerConnection(&this->pPeerConnection)); CHK_LOG_ERR(freeSignalingClient(&this->signalingClientHandle)); if(this->useIotCredentialProvider) { CHK_LOG_ERR(freeIotCredentialProvider(&this->pAwsCredentialProvider)); } else { CHK_LOG_ERR(freeStaticCredentialProvider(&this->pAwsCredentialProvider)); } } STATUS Peer::init(const Canary::PConfig pConfig, const Callbacks& callbacks) { STATUS retStatus = STATUS_SUCCESS; this->isMaster = pConfig->isMaster.value; this->trickleIce = pConfig->trickleIce.value; this->callbacks = callbacks; this->canaryOutgoingRTPMetricsContext.prevTs = GETTIME(); this->canaryOutgoingRTPMetricsContext.prevFramesDiscardedOnSend = 0; this->canaryOutgoingRTPMetricsContext.prevNackCount = 0; this->canaryOutgoingRTPMetricsContext.prevRetxBytesSent = 0; this->canaryOutgoingRTPMetricsContext.prevFramesSent = 0; this->canaryIncomingRTPMetricsContext.prevPacketsReceived = 0; this->canaryIncomingRTPMetricsContext.prevBytesReceived = 0; this->canaryIncomingRTPMetricsContext.prevFramesDropped = 0; this->canaryIncomingRTPMetricsContext.prevTs = GETTIME(); this->firstFrame = TRUE; this->useIotCredentialProvider = pConfig->useIotCredentialProvider.value; if(this->useIotCredentialProvider) { CHK_STATUS(createLwsIotCredentialProvider((PCHAR) pConfig->iotEndpoint, (PCHAR) pConfig->iotCoreCert.value.c_str(), (PCHAR) pConfig->iotCorePrivateKey.value.c_str(), (PCHAR) pConfig->caCertPath.value.c_str(), (PCHAR) pConfig->iotCoreRoleAlias.value.c_str(), (PCHAR) pConfig->channelName.value.c_str(), &pAwsCredentialProvider)); } else { CHK_STATUS(createStaticCredentialProvider((PCHAR) pConfig->accessKey.value.c_str(), 0, (PCHAR) pConfig->secretKey.value.c_str(), 0, (PCHAR) pConfig->sessionToken.value.c_str(), 0, MAX_UINT64, &pAwsCredentialProvider)); } CHK_STATUS(initSignaling(pConfig)); CHK_STATUS(initRtcConfiguration(pConfig)); CleanUp: return retStatus; } STATUS Peer::initSignaling(const Canary::PConfig pConfig) { STATUS retStatus = STATUS_SUCCESS; ChannelInfo channelInfo; SignalingClientCallbacks clientCallbacks; CHAR controlPlaneUrl[MAX_CONTROL_PLANE_URI_CHAR_LEN]; MEMSET(&this->clientInfo, 0, SIZEOF(this->clientInfo)); MEMSET(&channelInfo, 0, SIZEOF(channelInfo)); MEMSET(&clientCallbacks, 0, SIZEOF(clientCallbacks)); this->clientInfo.version = SIGNALING_CLIENT_INFO_CURRENT_VERSION; this->clientInfo.loggingLevel = pConfig->logLevel.value; STRCPY(this->clientInfo.clientId, pConfig->clientId.value.c_str()); channelInfo.version = CHANNEL_INFO_CURRENT_VERSION; if (!pConfig->endpoint.value.empty()) { SNPRINTF(controlPlaneUrl, MAX_CONTROL_PLANE_URI_CHAR_LEN, "%s%s", CONTROL_PLANE_URI_PREFIX, pConfig->endpoint.value.c_str()); channelInfo.pControlPlaneUrl = (PCHAR) controlPlaneUrl; } channelInfo.pChannelName = (PCHAR) pConfig->channelName.value.c_str(); channelInfo.pRegion = (PCHAR) pConfig->region.value.c_str(); channelInfo.pKmsKeyId = NULL; channelInfo.tagCount = 0; channelInfo.pTags = NULL; channelInfo.channelType = SIGNALING_CHANNEL_TYPE_SINGLE_MASTER; channelInfo.channelRoleType = pConfig->isMaster.value ? SIGNALING_CHANNEL_ROLE_TYPE_MASTER : SIGNALING_CHANNEL_ROLE_TYPE_VIEWER; channelInfo.cachingPolicy = SIGNALING_API_CALL_CACHE_TYPE_FILE; channelInfo.cachingPeriod = SIGNALING_API_CALL_CACHE_TTL_SENTINEL_VALUE; channelInfo.asyncIceServerConfig = TRUE; channelInfo.retry = TRUE; channelInfo.reconnect = TRUE; channelInfo.pCertPath = (PCHAR) DEFAULT_KVS_CACERT_PATH; channelInfo.messageTtl = 0; // Default is 60 seconds this->clientInfo.signalingClientCreationMaxRetryAttempts = MAX_CALL_RETRY_COUNT; clientCallbacks.customData = (UINT64) this; clientCallbacks.stateChangeFn = [](UINT64 customData, SIGNALING_CLIENT_STATE state) -> STATUS { STATUS retStatus = STATUS_SUCCESS; PPeer pPeer = (PPeer) customData; PCHAR pStateStr; signalingClientGetStateString(state, &pStateStr); DLOGD("Signaling client state changed to %d - '%s'", state, pStateStr); switch (state) { case SIGNALING_CLIENT_STATE_NEW: pPeer->signalingStartTime = GETTIME(); break; case SIGNALING_CLIENT_STATE_CONNECTED: { if (!pPeer->initializedSignaling) { auto duration = (GETTIME() - pPeer->signalingStartTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND; DLOGI("Signaling took %lu ms to connect", duration); Canary::Cloudwatch::getInstance().monitoring.pushSignalingInitDelay(duration, Aws::CloudWatch::Model::StandardUnit::Milliseconds); pPeer->initializedSignaling = TRUE; } break; } default: break; } // Return success to continue return retStatus; }; clientCallbacks.errorReportFn = [](UINT64 customData, STATUS status, PCHAR msg, UINT32 msgLen) -> STATUS { PPeer pPeer = (PPeer) customData; DLOGW("Signaling client generated an error 0x%08x - '%.*s'", status, msgLen, msg); // When an error happens with signaling, we'll let it crash so that this canary can be restarted. // The error will be captured in at higher level metrics. if (status == STATUS_SIGNALING_ICE_CONFIG_REFRESH_FAILED || status == STATUS_SIGNALING_RECONNECT_FAILED) { pPeer->status = status; // Let the higher level to terminate if (pPeer->callbacks.onDisconnected != NULL) { pPeer->callbacks.onDisconnected(); } } return STATUS_SUCCESS; }; clientCallbacks.messageReceivedFn = [](UINT64 customData, PReceivedSignalingMessage pMsg) -> STATUS { STATUS retStatus = STATUS_SUCCESS; PPeer pPeer = (PPeer) customData; std::lock_guard<std::recursive_mutex> lock(pPeer->mutex); if (!pPeer->foundPeerId.load()) { pPeer->peerId = pMsg->signalingMessage.peerClientId; DLOGI("Found peer id: %s", pPeer->peerId.c_str()); pPeer->foundPeerId = TRUE; CHK_STATUS(pPeer->initPeerConnection()); } if (pPeer->isMaster && STRCMP(pPeer->peerId.c_str(), pMsg->signalingMessage.peerClientId) != 0) { DLOGW("Unexpected receiving message from extra peer: %s", pMsg->signalingMessage.peerClientId); CHK(FALSE, retStatus); } CHK_STATUS(pPeer->handleSignalingMsg(pMsg)); CleanUp: return retStatus; }; CHK_STATUS(createSignalingClientSync(&this->clientInfo, &channelInfo, &clientCallbacks, pAwsCredentialProvider, &signalingClientHandle)); CHK_STATUS(signalingClientFetchSync(signalingClientHandle)); CleanUp: return retStatus; } STATUS Peer::initRtcConfiguration(const Canary::PConfig pConfig) { auto awaitGetIceConfigInfoCount = [](SIGNALING_CLIENT_HANDLE signalingClientHandle, PUINT32 pIceConfigInfoCount) -> STATUS { STATUS retStatus = STATUS_SUCCESS; UINT64 elapsed = 0; CHK(IS_VALID_SIGNALING_CLIENT_HANDLE(signalingClientHandle) && pIceConfigInfoCount != NULL, STATUS_NULL_ARG); while (TRUE) { // Get the configuration count CHK_STATUS(signalingClientGetIceConfigInfoCount(signalingClientHandle, pIceConfigInfoCount)); // Return OK if we have some ice configs CHK(*pIceConfigInfoCount == 0, retStatus); // Check for timeout CHK_ERR(elapsed <= ASYNC_ICE_CONFIG_INFO_WAIT_TIMEOUT, STATUS_OPERATION_TIMED_OUT, "Couldn't retrieve ICE configurations in alotted time."); THREAD_SLEEP(ICE_CONFIG_INFO_POLL_PERIOD); elapsed += ICE_CONFIG_INFO_POLL_PERIOD; } CleanUp: return retStatus; }; STATUS retStatus = STATUS_SUCCESS; UINT32 i, j, iceConfigCount, uriCount; PIceConfigInfo pIceConfigInfo; PRtcConfiguration pConfiguration = &this->rtcConfiguration; MEMSET(pConfiguration, 0x00, SIZEOF(RtcConfiguration)); // Set this to custom callback to enable filtering of interfaces pConfiguration->kvsRtcConfiguration.iceSetInterfaceFilterFunc = NULL; if (pConfig->forceTurn.value) { pConfiguration->iceTransportPolicy = ICE_TRANSPORT_POLICY_RELAY; } // Set the STUN server if (pConfig->endpoint.value.empty()) { SNPRINTF(pConfiguration->iceServers[0].urls, MAX_ICE_CONFIG_URI_LEN, KINESIS_VIDEO_STUN_URL, pConfig->region.value.c_str()); } else { SNPRINTF(pConfiguration->iceServers[0].urls, MAX_ICE_CONFIG_URI_LEN, "stun:stun.%s:443", pConfig->endpoint.value.c_str()); } if (pConfig->useTurn.value) { // Set the URIs from the configuration CHK_STATUS(awaitGetIceConfigInfoCount(signalingClientHandle, &iceConfigCount)); /* signalingClientGetIceConfigInfoCount can return more than one turn server. Use only one to optimize * candidate gathering latency. But user can also choose to use more than 1 turn server. */ for (uriCount = 0, i = 0; i < MAX_TURN_SERVERS; i++) { CHK_STATUS(signalingClientGetIceConfigInfo(signalingClientHandle, i, &pIceConfigInfo)); for (j = 0; j < pIceConfigInfo->uriCount; j++) { CHECK(uriCount < MAX_ICE_SERVERS_COUNT); /* * if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=udp" then ICE will try TURN over UDP * if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS * if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=udp", it's currently ignored because sdk dont do * TURN over DTLS yet. if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=tcp" then ICE will try TURN over * TCP/TLS if configuration.iceServers[uriCount + 1].urls is "turn:ip:port" then ICE will try both TURN over UPD and TCP/TLS * * It's recommended to not pass too many TURN iceServers to configuration because it will slow down ice gathering in non-trickle * mode. */ STRNCPY(pConfiguration->iceServers[uriCount + 1].urls, pIceConfigInfo->uris[j], MAX_ICE_CONFIG_URI_LEN); STRNCPY(pConfiguration->iceServers[uriCount + 1].credential, pIceConfigInfo->password, MAX_ICE_CONFIG_CREDENTIAL_LEN); STRNCPY(pConfiguration->iceServers[uriCount + 1].username, pIceConfigInfo->userName, MAX_ICE_CONFIG_USER_NAME_LEN); uriCount++; } } } CleanUp: return retStatus; } STATUS Peer::initPeerConnection() { auto handleOnIceCandidate = [](UINT64 customData, PCHAR candidateJson) -> VOID { STATUS retStatus = STATUS_SUCCESS; auto pPeer = (PPeer) customData; SignalingMessage message; if (candidateJson == NULL) { DLOGD("ice candidate gathering finished"); pPeer->iceGatheringDone = TRUE; pPeer->cvar.notify_all(); } else if (pPeer->trickleIce) { message.messageType = SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE; STRCPY(message.payload, candidateJson); CHK_STATUS(pPeer->send(&message)); } CleanUp: CHK_LOG_ERR(retStatus); }; auto onConnectionStateChange = [](UINT64 customData, RTC_PEER_CONNECTION_STATE newState) -> VOID { auto pPeer = (PPeer) customData; DLOGI("New connection state %u", newState); switch (newState) { case RTC_PEER_CONNECTION_STATE_CONNECTING: pPeer->iceHolePunchingStartTime = GETTIME(); break; case RTC_PEER_CONNECTION_STATE_CONNECTED: { auto duration = (GETTIME() - pPeer->iceHolePunchingStartTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND; DLOGI("ICE hole punching took %lu ms", duration); Canary::Cloudwatch::getInstance().monitoring.pushICEHolePunchingDelay(duration, Aws::CloudWatch::Model::StandardUnit::Milliseconds); break; } case RTC_PEER_CONNECTION_STATE_FAILED: // TODO: Replace this with a proper error code. Since there's no way to get the actual error code // at this moment, STATUS_PEERCONNECTION_BASE seems to be the best error code. pPeer->status = STATUS_PEERCONNECTION_BASE; // explicit fallthrough case RTC_PEER_CONNECTION_STATE_CLOSED: // explicit fallthrough case RTC_PEER_CONNECTION_STATE_DISCONNECTED: // Let the higher level to terminate if (pPeer->callbacks.onDisconnected != NULL) { pPeer->callbacks.onDisconnected(); } break; default: break; } }; STATUS retStatus = STATUS_SUCCESS; CHK(this->pPeerConnection == NULL, STATUS_INVALID_OPERATION); CHK_STATUS(createPeerConnection(&this->rtcConfiguration, &this->pPeerConnection)); CHK_STATUS(peerConnectionOnIceCandidate(this->pPeerConnection, (UINT64) this, handleOnIceCandidate)); CHK_STATUS(peerConnectionOnConnectionStateChange(this->pPeerConnection, (UINT64) this, onConnectionStateChange)); if (this->callbacks.onNewConnection != NULL) { this->callbacks.onNewConnection(this); } CleanUp: return retStatus; } STATUS Peer::shutdown() { this->terminated = TRUE; this->cvar.notify_all(); { // lock to wait until awoken thread finish. std::lock_guard<std::recursive_mutex> lock(this->mutex); } if (this->pPeerConnection != NULL) { CHK_LOG_ERR(closePeerConnection(this->pPeerConnection)); } if (!this->isMaster && IS_VALID_SIGNALING_CLIENT_HANDLE(this->signalingClientHandle)) { CHK_LOG_ERR(signalingClientDeleteSync(this->signalingClientHandle)); } return this->status; } STATUS Peer::connect() { auto connectPeerConnection = [this]() -> STATUS { STATUS retStatus = STATUS_SUCCESS; RtcSessionDescriptionInit offerSDPInit; UINT32 buffLen; SignalingMessage msg; MEMSET(&offerSDPInit, 0, SIZEOF(offerSDPInit)); CHK_STATUS(createOffer(this->pPeerConnection, &offerSDPInit)); CHK_STATUS(setLocalDescription(this->pPeerConnection, &offerSDPInit)); if (!this->trickleIce) { CHK_STATUS(this->awaitIceGathering(&offerSDPInit)); } msg.messageType = SIGNALING_MESSAGE_TYPE_OFFER; CHK_STATUS(serializeSessionDescriptionInit(&offerSDPInit, NULL, &buffLen)); CHK_STATUS(serializeSessionDescriptionInit(&offerSDPInit, msg.payload, &buffLen)); CHK_STATUS(this->send(&msg)); CleanUp: return retStatus; }; STATUS retStatus = STATUS_SUCCESS; CHK_STATUS(signalingClientConnectSync(signalingClientHandle)); if (!this->isMaster) { this->foundPeerId = TRUE; this->peerId = DEFAULT_VIEWER_PEER_ID; CHK_STATUS(this->initPeerConnection()); CHK_STATUS(connectPeerConnection()); } CleanUp: return retStatus; } STATUS Peer::send(PSignalingMessage pMsg) { STATUS retStatus = STATUS_SUCCESS; if (this->foundPeerId.load()) { pMsg->version = SIGNALING_MESSAGE_CURRENT_VERSION; pMsg->correlationId[0] = '\0'; STRCPY(pMsg->peerClientId, peerId.c_str()); pMsg->payloadLen = (UINT32) STRLEN(pMsg->payload); CHK_STATUS(signalingClientSendMessageSync(this->signalingClientHandle, pMsg)); } else { // TODO: maybe queue messages when there's no peer id DLOGW("Peer id hasn't been found yet. Failed to send a signaling message"); } CleanUp: return retStatus; } STATUS Peer::awaitIceGathering(PRtcSessionDescriptionInit pSDPInit) { STATUS retStatus = STATUS_SUCCESS; std::unique_lock<std::recursive_mutex> lock(this->mutex); this->cvar.wait(lock, [this]() { return this->terminated.load() || this->iceGatheringDone.load(); }); CHK_WARN(!this->terminated.load(), STATUS_OPERATION_TIMED_OUT, "application terminated and candidate gathering still not done"); CHK_STATUS(peerConnectionGetLocalDescription(this->pPeerConnection, pSDPInit)); CleanUp: return retStatus; }; STATUS Peer::handleSignalingMsg(PReceivedSignalingMessage pMsg) { auto handleOffer = [this](SignalingMessage& msg) -> STATUS { STATUS retStatus = STATUS_SUCCESS; RtcSessionDescriptionInit offerSDPInit, answerSDPInit; NullableBool canTrickle; UINT32 buffLen; this->offerReceiveTimestamp = GETTIME(); if (!this->isMaster) { DLOGW("Unexpected message SIGNALING_MESSAGE_TYPE_OFFER"); CHK(FALSE, retStatus); } if (receivedOffer.exchange(TRUE)) { DLOGW("Offer already received, ignore new offer from client id %s", msg.peerClientId); CHK(FALSE, retStatus); } MEMSET(&offerSDPInit, 0, SIZEOF(offerSDPInit)); MEMSET(&answerSDPInit, 0, SIZEOF(answerSDPInit)); CHK_STATUS(deserializeSessionDescriptionInit(msg.payload, msg.payloadLen, &offerSDPInit)); CHK_STATUS(setRemoteDescription(this->pPeerConnection, &offerSDPInit)); canTrickle = canTrickleIceCandidates(this->pPeerConnection); /* cannot be null after setRemoteDescription */ CHECK(!NULLABLE_CHECK_EMPTY(canTrickle)); CHK_STATUS(createAnswer(this->pPeerConnection, &answerSDPInit)); CHK_STATUS(setLocalDescription(this->pPeerConnection, &answerSDPInit)); if (!canTrickle.value) { CHK_STATUS(this->awaitIceGathering(&answerSDPInit)); } msg.messageType = SIGNALING_MESSAGE_TYPE_ANSWER; CHK_STATUS(serializeSessionDescriptionInit(&answerSDPInit, NULL, &buffLen)); CHK_STATUS(serializeSessionDescriptionInit(&answerSDPInit, msg.payload, &buffLen)); CHK_STATUS(this->send(&msg)); CleanUp: return retStatus; }; auto handleAnswer = [this](SignalingMessage& msg) -> STATUS { STATUS retStatus = STATUS_SUCCESS; RtcSessionDescriptionInit answerSDPInit; if (this->isMaster) { DLOGW("Unexpected message SIGNALING_MESSAGE_TYPE_ANSWER"); } else if (receivedAnswer.exchange(TRUE)) { DLOGW("Offer already received, ignore new offer from client id %s", msg.peerClientId); } else { MEMSET(&answerSDPInit, 0x00, SIZEOF(RtcSessionDescriptionInit)); CHK_STATUS(deserializeSessionDescriptionInit(msg.payload, msg.payloadLen, &answerSDPInit)); CHK_STATUS(setRemoteDescription(this->pPeerConnection, &answerSDPInit)); } CleanUp: return retStatus; }; auto handleICECandidate = [this](SignalingMessage& msg) -> STATUS { STATUS retStatus = STATUS_SUCCESS; RtcIceCandidateInit iceCandidate; CHK_STATUS(deserializeRtcIceCandidateInit(msg.payload, msg.payloadLen, &iceCandidate)); CHK_STATUS(addIceCandidate(this->pPeerConnection, iceCandidate.candidate)); CleanUp: CHK_LOG_ERR(retStatus); return retStatus; }; STATUS retStatus = STATUS_SUCCESS; auto& msg = pMsg->signalingMessage; CHK(!this->terminated.load(), retStatus); switch (msg.messageType) { case SIGNALING_MESSAGE_TYPE_OFFER: CHK_STATUS(handleOffer(msg)); break; case SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE: CHK_STATUS(handleICECandidate(msg)); break; case SIGNALING_MESSAGE_TYPE_ANSWER: CHK_STATUS(handleAnswer(msg)); break; default: DLOGW("Unknown message type %u", msg.messageType); break; } CleanUp: return retStatus; } STATUS Peer::addTransceiver(RtcMediaStreamTrack& track) { auto handleBandwidthEstimation = [](UINT64 customData, DOUBLE maxiumBitrate) -> VOID { UNUSED_PARAM(customData); // TODO: Probably reexpose or add metrics here directly DLOGV("received bitrate suggestion: %f", maxiumBitrate); }; auto handleVideoFrame = [](UINT64 customData, PFrame pFrame) -> VOID { PPeer pPeer = (Canary::PPeer)(customData); std::unique_lock<std::recursive_mutex> lock(pPeer->mutex); PBYTE frameDataPtr = pFrame->frameData + ANNEX_B_NALU_SIZE; UINT32 rawPacketSize = 0; // Get size of hex encoded data hexDecode((PCHAR) frameDataPtr, pFrame->size - ANNEX_B_NALU_SIZE, NULL, &rawPacketSize); PBYTE rawPacket = (PBYTE) MEMCALLOC(1, (rawPacketSize * SIZEOF(BYTE))); hexDecode((PCHAR) frameDataPtr, pFrame->size - ANNEX_B_NALU_SIZE, rawPacket, &rawPacketSize); // Extract the timestamp field from raw packet frameDataPtr = rawPacket; UINT64 receivedTs = getUnalignedInt64BigEndian((PINT64)(frameDataPtr)); frameDataPtr += SIZEOF(UINT64); UINT32 receivedSize = getUnalignedInt32BigEndian((PINT32)(frameDataPtr)); pPeer->endToEndMetricsContext.frameLatencyAvg = EMA_ACCUMULATOR_GET_NEXT(pPeer->endToEndMetricsContext.frameLatencyAvg, GETTIME() - receivedTs); // Do a size match of the raw packet. Since raw packet does not contain the NALu, the // comparison would be rawPacketSize + ANNEX_B_NALU_SIZE and the received size if (rawPacketSize + ANNEX_B_NALU_SIZE == receivedSize) { pPeer->endToEndMetricsContext.sizeMatchAvg = EMA_ACCUMULATOR_GET_NEXT(pPeer->endToEndMetricsContext.sizeMatchAvg, 1); } else { pPeer->endToEndMetricsContext.sizeMatchAvg = EMA_ACCUMULATOR_GET_NEXT(pPeer->endToEndMetricsContext.sizeMatchAvg, 0); } SAFE_MEMFREE(rawPacket); }; PRtcRtpTransceiver pTransceiver; STATUS retStatus = STATUS_SUCCESS; CHK_STATUS(::addTransceiver(pPeerConnection, &track, NULL, &pTransceiver)); if (track.kind == MEDIA_STREAM_TRACK_KIND_VIDEO) { this->videoTransceivers.push_back(pTransceiver); // As part of canaries, we will only be monitoring video transceiver as we do for every other metrics CHK_STATUS(transceiverOnFrame(pTransceiver, (UINT64) this, handleVideoFrame)); } else { this->audioTransceivers.push_back(pTransceiver); } CHK_STATUS(transceiverOnBandwidthEstimation(pTransceiver, (UINT64) this, handleBandwidthEstimation)); CleanUp: return retStatus; } STATUS Peer::addSupportedCodec(RTC_CODEC codec) { STATUS retStatus = STATUS_SUCCESS; CHK_STATUS(::addSupportedCodec(pPeerConnection, codec)); CleanUp: return retStatus; } STATUS Peer::writeFrame(PFrame pFrame, MEDIA_STREAM_TRACK_KIND kind) { STATUS retStatus = STATUS_SUCCESS; DOUBLE timeToFirstFrame; auto& transceivers = kind == MEDIA_STREAM_TRACK_KIND_VIDEO ? this->videoTransceivers : this->audioTransceivers; if (kind == MEDIA_STREAM_TRACK_KIND_VIDEO) { std::lock_guard<std::mutex> lock(this->countUpdateMutex); if (this->recorded.load()) { this->canaryOutgoingRTPMetricsContext.videoFramesGenerated = 0; this->canaryOutgoingRTPMetricsContext.videoBytesGenerated = 0; this->recorded = FALSE; } this->canaryOutgoingRTPMetricsContext.videoFramesGenerated++; this->canaryOutgoingRTPMetricsContext.videoBytesGenerated += pFrame->size; } for (auto& transceiver : transceivers) { retStatus = ::writeFrame(transceiver, pFrame); CHK (retStatus == STATUS_SRTP_NOT_READY_YET || retStatus == STATUS_SUCCESS, retStatus); if (STATUS_SUCCEEDED(retStatus) && this->firstFrame && this->isMaster) { this->firstFrame = FALSE; timeToFirstFrame = (DOUBLE) (GETTIME() - this->offerReceiveTimestamp) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND; DLOGD("Start up latency from offer receive to first frame write: %lf ms", timeToFirstFrame); Canary::Cloudwatch::getInstance().monitoring.pushTimeToFirstFrame(timeToFirstFrame, Aws::CloudWatch::Model::StandardUnit::Milliseconds); } else { retStatus = STATUS_SUCCESS; } } CleanUp: return retStatus; } STATUS Peer::populateOutgoingRtpMetricsContext() { DOUBLE currentDuration = 0; currentDuration = (DOUBLE)(this->canaryMetrics.timestamp - this->canaryOutgoingRTPMetricsContext.prevTs) / HUNDREDS_OF_NANOS_IN_A_SECOND; { std::lock_guard<std::mutex> lock(this->countUpdateMutex); this->canaryOutgoingRTPMetricsContext.framesPercentageDiscarded = ((DOUBLE)(this->canaryMetrics.rtcStatsObject.outboundRtpStreamStats.framesDiscardedOnSend - this->canaryOutgoingRTPMetricsContext.prevFramesDiscardedOnSend) / (DOUBLE) this->canaryOutgoingRTPMetricsContext.videoFramesGenerated) * 100.0; this->canaryOutgoingRTPMetricsContext.retxBytesPercentage = (((DOUBLE) this->canaryMetrics.rtcStatsObject.outboundRtpStreamStats.retransmittedBytesSent - (DOUBLE)(this->canaryOutgoingRTPMetricsContext.prevRetxBytesSent)) / (DOUBLE) this->canaryOutgoingRTPMetricsContext.videoBytesGenerated) * 100.0; } // This flag ensures the reset of video bytes count is done only when this flag is set this->recorded = TRUE; this->canaryOutgoingRTPMetricsContext.averageFramesSentPerSecond = ((DOUBLE)(this->canaryMetrics.rtcStatsObject.outboundRtpStreamStats.framesSent - (DOUBLE) this->canaryOutgoingRTPMetricsContext.prevFramesSent)) / currentDuration; this->canaryOutgoingRTPMetricsContext.nacksPerSecond = ((DOUBLE) this->canaryMetrics.rtcStatsObject.outboundRtpStreamStats.nackCount - this->canaryOutgoingRTPMetricsContext.prevNackCount) / currentDuration; this->canaryOutgoingRTPMetricsContext.prevFramesSent = this->canaryMetrics.rtcStatsObject.outboundRtpStreamStats.framesSent; this->canaryOutgoingRTPMetricsContext.prevTs = this->canaryMetrics.timestamp; this->canaryOutgoingRTPMetricsContext.prevFramesDiscardedOnSend = this->canaryMetrics.rtcStatsObject.outboundRtpStreamStats.framesDiscardedOnSend; this->canaryOutgoingRTPMetricsContext.prevNackCount = this->canaryMetrics.rtcStatsObject.outboundRtpStreamStats.nackCount; this->canaryOutgoingRTPMetricsContext.prevRetxBytesSent = this->canaryMetrics.rtcStatsObject.outboundRtpStreamStats.retransmittedBytesSent; return STATUS_SUCCESS; } STATUS Peer::populateIncomingRtpMetricsContext() { DOUBLE currentDuration = 0; currentDuration = (DOUBLE)(this->canaryMetrics.timestamp - this->canaryIncomingRTPMetricsContext.prevTs) / HUNDREDS_OF_NANOS_IN_A_SECOND; this->canaryIncomingRTPMetricsContext.packetReceiveRate = (DOUBLE)(this->canaryMetrics.rtcStatsObject.inboundRtpStreamStats.received.packetsReceived - this->canaryIncomingRTPMetricsContext.prevPacketsReceived) / currentDuration; this->canaryIncomingRTPMetricsContext.incomingBitRate = ((DOUBLE)(this->canaryMetrics.rtcStatsObject.inboundRtpStreamStats.bytesReceived - this->canaryIncomingRTPMetricsContext.prevBytesReceived) / currentDuration) / 0.008; this->canaryIncomingRTPMetricsContext.framesDroppedPerSecond = ((DOUBLE) this->canaryMetrics.rtcStatsObject.inboundRtpStreamStats.received.framesDropped - this->canaryIncomingRTPMetricsContext.prevFramesDropped) / currentDuration; this->canaryIncomingRTPMetricsContext.prevPacketsReceived = this->canaryMetrics.rtcStatsObject.inboundRtpStreamStats.received.packetsReceived; this->canaryIncomingRTPMetricsContext.prevBytesReceived = this->canaryMetrics.rtcStatsObject.inboundRtpStreamStats.bytesReceived; this->canaryIncomingRTPMetricsContext.prevFramesDropped = this->canaryMetrics.rtcStatsObject.inboundRtpStreamStats.received.framesDropped; this->canaryIncomingRTPMetricsContext.prevTs = this->canaryMetrics.timestamp; return STATUS_SUCCESS; } STATUS Peer::publishStatsForCanary(RTC_STATS_TYPE statsType) { STATUS retStatus = STATUS_SUCCESS; this->canaryMetrics.requestedTypeOfStats = statsType; switch (statsType) { case RTC_STATS_TYPE_OUTBOUND_RTP: if (!this->videoTransceivers.empty()) { CHK_LOG_ERR(::rtcPeerConnectionGetMetrics(this->pPeerConnection, this->videoTransceivers.back(), &this->canaryMetrics)); this->populateOutgoingRtpMetricsContext(); Canary::Cloudwatch::getInstance().monitoring.pushOutboundRtpStats(&this->canaryOutgoingRTPMetricsContext); } break; case RTC_STATS_TYPE_INBOUND_RTP: if (!this->videoTransceivers.empty()) { CHK_LOG_ERR(::rtcPeerConnectionGetMetrics(this->pPeerConnection, this->videoTransceivers.back(), &this->canaryMetrics)); this->populateIncomingRtpMetricsContext(); Canary::Cloudwatch::getInstance().monitoring.pushInboundRtpStats(&this->canaryIncomingRTPMetricsContext); } break; default: CHK(FALSE, STATUS_NOT_IMPLEMENTED); } CleanUp: return retStatus; } STATUS Peer::publishEndToEndMetrics() { std::unique_lock<std::recursive_mutex> lock(this->mutex); Canary::Cloudwatch::getInstance().monitoring.pushEndToEndMetrics(this->endToEndMetricsContext); return STATUS_SUCCESS; } STATUS Peer::publishRetryCount() { STATUS retStatus = STATUS_SUCCESS; Canary::Cloudwatch::getInstance().monitoring.pushRetryCount(this->clientInfo.stateMachineRetryCountReadOnly); CleanUp: return retStatus; } } // namespace Canary