#define LOG_CLASS "WebRtcSamples" #include "Samples.h" PSampleConfiguration gSampleConfiguration = NULL; VOID sigintHandler(INT32 sigNum) { UNUSED_PARAM(sigNum); if (gSampleConfiguration != NULL) { ATOMIC_STORE_BOOL(&gSampleConfiguration->interrupted, TRUE); CVAR_BROADCAST(gSampleConfiguration->cvar); } } VOID onDataChannelMessage(UINT64 customData, PRtcDataChannel pDataChannel, BOOL isBinary, PBYTE pMessage, UINT32 pMessageLen) { UNUSED_PARAM(customData); UNUSED_PARAM(pDataChannel); if (isBinary) { DLOGI("DataChannel Binary Message"); } else { DLOGI("DataChannel String Message: %.*s\n", pMessageLen, pMessage); } } VOID onDataChannel(UINT64 customData, PRtcDataChannel pRtcDataChannel) { DLOGI("New DataChannel has been opened %s \n", pRtcDataChannel->name); dataChannelOnMessage(pRtcDataChannel, customData, onDataChannelMessage); } VOID onConnectionStateChange(UINT64 customData, RTC_PEER_CONNECTION_STATE newState) { STATUS retStatus = STATUS_SUCCESS; PSampleStreamingSession pSampleStreamingSession = (PSampleStreamingSession) customData; CHK(pSampleStreamingSession != NULL && pSampleStreamingSession->pSampleConfiguration != NULL, STATUS_INTERNAL_ERROR); PSampleConfiguration pSampleConfiguration = pSampleStreamingSession->pSampleConfiguration; DLOGI("New connection state %u", newState); switch (newState) { case RTC_PEER_CONNECTION_STATE_CONNECTED: ATOMIC_STORE_BOOL(&pSampleConfiguration->connected, TRUE); CVAR_BROADCAST(pSampleConfiguration->cvar); if (STATUS_FAILED(retStatus = logSelectedIceCandidatesInformation(pSampleStreamingSession))) { DLOGW("Failed to get information about selected Ice candidates: 0x%08x", retStatus); } break; case RTC_PEER_CONNECTION_STATE_FAILED: // explicit fallthrough case RTC_PEER_CONNECTION_STATE_CLOSED: // explicit fallthrough case RTC_PEER_CONNECTION_STATE_DISCONNECTED: ATOMIC_STORE_BOOL(&pSampleStreamingSession->terminateFlag, TRUE); CVAR_BROADCAST(pSampleConfiguration->cvar); // explicit fallthrough default: ATOMIC_STORE_BOOL(&pSampleConfiguration->connected, FALSE); CVAR_BROADCAST(pSampleConfiguration->cvar); break; } CleanUp: CHK_LOG_ERR(retStatus); } STATUS signalingClientStateChanged(UINT64 customData, SIGNALING_CLIENT_STATE state) { UNUSED_PARAM(customData); STATUS retStatus = STATUS_SUCCESS; PCHAR pStateStr; signalingClientGetStateString(state, &pStateStr); DLOGV("Signaling client state changed to %d - '%s'", state, pStateStr); // Return success to continue return retStatus; } STATUS signalingClientError(UINT64 customData, STATUS status, PCHAR msg, UINT32 msgLen) { PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData; DLOGW("Signaling client generated an error 0x%08x - '%.*s'", status, msgLen, msg); // We will force re-create the signaling client on the following errors if (status == STATUS_SIGNALING_ICE_CONFIG_REFRESH_FAILED || status == STATUS_SIGNALING_RECONNECT_FAILED) { ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, TRUE); CVAR_BROADCAST(pSampleConfiguration->cvar); } return STATUS_SUCCESS; } STATUS logSelectedIceCandidatesInformation(PSampleStreamingSession pSampleStreamingSession) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; RtcStats rtcMetrics; CHK(pSampleStreamingSession != NULL, STATUS_NULL_ARG); rtcMetrics.requestedTypeOfStats = RTC_STATS_TYPE_LOCAL_CANDIDATE; CHK_STATUS(rtcPeerConnectionGetMetrics(pSampleStreamingSession->pPeerConnection, NULL, &rtcMetrics)); DLOGD("Local Candidate IP Address: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.address); DLOGD("Local Candidate type: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.candidateType); DLOGD("Local Candidate port: %d", rtcMetrics.rtcStatsObject.localIceCandidateStats.port); DLOGD("Local Candidate priority: %d", rtcMetrics.rtcStatsObject.localIceCandidateStats.priority); DLOGD("Local Candidate transport protocol: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.protocol); DLOGD("Local Candidate relay protocol: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.relayProtocol); DLOGD("Local Candidate Ice server source: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.url); rtcMetrics.requestedTypeOfStats = RTC_STATS_TYPE_REMOTE_CANDIDATE; CHK_STATUS(rtcPeerConnectionGetMetrics(pSampleStreamingSession->pPeerConnection, NULL, &rtcMetrics)); DLOGD("Remote Candidate IP Address: %s", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.address); DLOGD("Remote Candidate type: %s", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.candidateType); DLOGD("Remote Candidate port: %d", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.port); DLOGD("Remote Candidate priority: %d", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.priority); DLOGD("Remote Candidate transport protocol: %s", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.protocol); CleanUp: LEAVES(); return retStatus; } STATUS handleAnswer(PSampleConfiguration pSampleConfiguration, PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pSignalingMessage) { UNUSED_PARAM(pSampleConfiguration); STATUS retStatus = STATUS_SUCCESS; RtcSessionDescriptionInit answerSessionDescriptionInit; MEMSET(&answerSessionDescriptionInit, 0x00, SIZEOF(RtcSessionDescriptionInit)); CHK_STATUS(deserializeSessionDescriptionInit(pSignalingMessage->payload, pSignalingMessage->payloadLen, &answerSessionDescriptionInit)); CHK_STATUS(setRemoteDescription(pSampleStreamingSession->pPeerConnection, &answerSessionDescriptionInit)); CleanUp: CHK_LOG_ERR(retStatus); return retStatus; } PVOID mediaSenderRoutine(PVOID customData) { STATUS retStatus = STATUS_SUCCESS; PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData; TID videoSenderTid = INVALID_TID_VALUE, audioSenderTid = INVALID_TID_VALUE; MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock); while (!ATOMIC_LOAD_BOOL(&pSampleConfiguration->connected) && !ATOMIC_LOAD_BOOL(&pSampleConfiguration->appTerminateFlag)) { CVAR_WAIT(pSampleConfiguration->cvar, pSampleConfiguration->sampleConfigurationObjLock, 5 * HUNDREDS_OF_NANOS_IN_A_SECOND); } MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); CHK(!ATOMIC_LOAD_BOOL(&pSampleConfiguration->appTerminateFlag), retStatus); if (pSampleConfiguration->videoSource != NULL) { THREAD_CREATE(&videoSenderTid, pSampleConfiguration->videoSource, (PVOID) pSampleConfiguration); } if (pSampleConfiguration->audioSource != NULL) { THREAD_CREATE(&audioSenderTid, pSampleConfiguration->audioSource, (PVOID) pSampleConfiguration); } if (videoSenderTid != INVALID_TID_VALUE) { THREAD_JOIN(videoSenderTid, NULL); } if (audioSenderTid != INVALID_TID_VALUE) { THREAD_JOIN(audioSenderTid, NULL); } CleanUp: // clean the flag of the media thread. ATOMIC_STORE_BOOL(&pSampleConfiguration->mediaThreadStarted, FALSE); CHK_LOG_ERR(retStatus); return NULL; } STATUS handleOffer(PSampleConfiguration pSampleConfiguration, PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pSignalingMessage) { STATUS retStatus = STATUS_SUCCESS; RtcSessionDescriptionInit offerSessionDescriptionInit; NullableBool canTrickle; BOOL mediaThreadStarted; CHK(pSampleConfiguration != NULL && pSignalingMessage != NULL, STATUS_NULL_ARG); MEMSET(&offerSessionDescriptionInit, 0x00, SIZEOF(RtcSessionDescriptionInit)); MEMSET(&pSampleStreamingSession->answerSessionDescriptionInit, 0x00, SIZEOF(RtcSessionDescriptionInit)); CHK_STATUS(deserializeSessionDescriptionInit(pSignalingMessage->payload, pSignalingMessage->payloadLen, &offerSessionDescriptionInit)); CHK_STATUS(setRemoteDescription(pSampleStreamingSession->pPeerConnection, &offerSessionDescriptionInit)); canTrickle = canTrickleIceCandidates(pSampleStreamingSession->pPeerConnection); /* cannot be null after setRemoteDescription */ CHECK(!NULLABLE_CHECK_EMPTY(canTrickle)); pSampleStreamingSession->remoteCanTrickleIce = canTrickle.value; CHK_STATUS(setLocalDescription(pSampleStreamingSession->pPeerConnection, &pSampleStreamingSession->answerSessionDescriptionInit)); /* * If remote support trickle ice, send answer now. Otherwise answer will be sent once ice candidate gathering is complete. */ if (pSampleStreamingSession->remoteCanTrickleIce) { CHK_STATUS(createAnswer(pSampleStreamingSession->pPeerConnection, &pSampleStreamingSession->answerSessionDescriptionInit)); CHK_STATUS(respondWithAnswer(pSampleStreamingSession)); DLOGD("time taken to send answer %" PRIu64 " ms", (GETTIME() - pSampleStreamingSession->offerReceiveTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND); } mediaThreadStarted = ATOMIC_EXCHANGE_BOOL(&pSampleConfiguration->mediaThreadStarted, TRUE); if (!mediaThreadStarted) { THREAD_CREATE(&pSampleConfiguration->mediaSenderTid, mediaSenderRoutine, (PVOID) pSampleConfiguration); } // The audio video receive routine should be per streaming session if (pSampleConfiguration->receiveAudioVideoSource != NULL) { THREAD_CREATE(&pSampleStreamingSession->receiveAudioVideoSenderTid, pSampleConfiguration->receiveAudioVideoSource, (PVOID) pSampleStreamingSession); } CleanUp: CHK_LOG_ERR(retStatus); return retStatus; } STATUS sendSignalingMessage(PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pMessage) { STATUS retStatus = STATUS_SUCCESS; BOOL locked = FALSE; // Validate the input params CHK(pSampleStreamingSession != NULL && pSampleStreamingSession->pSampleConfiguration != NULL && pMessage != NULL, STATUS_NULL_ARG); CHK(IS_VALID_MUTEX_VALUE(pSampleStreamingSession->pSampleConfiguration->signalingSendMessageLock) && IS_VALID_SIGNALING_CLIENT_HANDLE(pSampleStreamingSession->pSampleConfiguration->signalingClientHandle), STATUS_INVALID_OPERATION); MUTEX_LOCK(pSampleStreamingSession->pSampleConfiguration->signalingSendMessageLock); locked = TRUE; CHK_STATUS(signalingClientSendMessageSync(pSampleStreamingSession->pSampleConfiguration->signalingClientHandle, pMessage)); CleanUp: if (locked) { MUTEX_UNLOCK(pSampleStreamingSession->pSampleConfiguration->signalingSendMessageLock); } CHK_LOG_ERR(retStatus); return retStatus; } STATUS respondWithAnswer(PSampleStreamingSession pSampleStreamingSession) { STATUS retStatus = STATUS_SUCCESS; SignalingMessage message; UINT32 buffLen = MAX_SIGNALING_MESSAGE_LEN; CHK_STATUS(serializeSessionDescriptionInit(&pSampleStreamingSession->answerSessionDescriptionInit, message.payload, &buffLen)); message.version = SIGNALING_MESSAGE_CURRENT_VERSION; message.messageType = SIGNALING_MESSAGE_TYPE_ANSWER; STRNCPY(message.peerClientId, pSampleStreamingSession->peerId, MAX_SIGNALING_CLIENT_ID_LEN); message.payloadLen = (UINT32) STRLEN(message.payload); message.correlationId[0] = '\0'; CHK_STATUS(sendSignalingMessage(pSampleStreamingSession, &message)); CleanUp: CHK_LOG_ERR(retStatus); return retStatus; } BOOL sampleFilterNetworkInterfaces(UINT64 customData, PCHAR networkInt) { UNUSED_PARAM(customData); BOOL useInterface = FALSE; if (STRNCMP(networkInt, (PCHAR) "eth0", ARRAY_SIZE("eth0")) == 0) { useInterface = TRUE; } DLOGD("%s %s", networkInt, (useInterface) ? ("allowed. Candidates to be gathered") : ("blocked. Candidates will not be gathered")); return useInterface; } VOID onIceCandidateHandler(UINT64 customData, PCHAR candidateJson) { STATUS retStatus = STATUS_SUCCESS; PSampleStreamingSession pSampleStreamingSession = (PSampleStreamingSession) customData; SignalingMessage message; CHK(pSampleStreamingSession != NULL, STATUS_NULL_ARG); if (candidateJson == NULL) { DLOGD("ice candidate gathering finished"); ATOMIC_STORE_BOOL(&pSampleStreamingSession->candidateGatheringDone, TRUE); // if application is master and non-trickle ice, send answer now. if (pSampleStreamingSession->pSampleConfiguration->channelInfo.channelRoleType == SIGNALING_CHANNEL_ROLE_TYPE_MASTER && !pSampleStreamingSession->remoteCanTrickleIce) { CHK_STATUS(createAnswer(pSampleStreamingSession->pPeerConnection, &pSampleStreamingSession->answerSessionDescriptionInit)); CHK_STATUS(respondWithAnswer(pSampleStreamingSession)); DLOGD("time taken to send answer %" PRIu64 " ms", (GETTIME() - pSampleStreamingSession->offerReceiveTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND); } else if (pSampleStreamingSession->pSampleConfiguration->channelInfo.channelRoleType == SIGNALING_CHANNEL_ROLE_TYPE_VIEWER && !pSampleStreamingSession->pSampleConfiguration->trickleIce) { CVAR_BROADCAST(pSampleStreamingSession->pSampleConfiguration->cvar); } } else if (pSampleStreamingSession->remoteCanTrickleIce && ATOMIC_LOAD_BOOL(&pSampleStreamingSession->peerIdReceived)) { message.version = SIGNALING_MESSAGE_CURRENT_VERSION; message.messageType = SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE; STRNCPY(message.peerClientId, pSampleStreamingSession->peerId, MAX_SIGNALING_CLIENT_ID_LEN); message.payloadLen = (UINT32) STRNLEN(candidateJson, MAX_SIGNALING_MESSAGE_LEN); STRNCPY(message.payload, candidateJson, message.payloadLen); message.correlationId[0] = '\0'; CHK_STATUS(sendSignalingMessage(pSampleStreamingSession, &message)); } CleanUp: CHK_LOG_ERR(retStatus); } STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcPeerConnection* ppRtcPeerConnection) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; RtcConfiguration configuration; UINT32 i, j, iceConfigCount, uriCount = 0, maxTurnServer = 1; PIceConfigInfo pIceConfigInfo; UINT64 data, curTime; PRtcCertificate pRtcCertificate = NULL; CHK(pSampleConfiguration != NULL && ppRtcPeerConnection != NULL, STATUS_NULL_ARG); MEMSET(&configuration, 0x00, SIZEOF(RtcConfiguration)); // Set this to custom callback to enable filtering of interfaces configuration.kvsRtcConfiguration.iceSetInterfaceFilterFunc = NULL; // Set the ICE mode explicitly configuration.iceTransportPolicy = ICE_TRANSPORT_POLICY_ALL; // Set the STUN server SNPRINTF(configuration.iceServers[0].urls, MAX_ICE_CONFIG_URI_LEN, KINESIS_VIDEO_STUN_URL, pSampleConfiguration->channelInfo.pRegion); if (pSampleConfiguration->useTurn) { // Set the URIs from the configuration CHK_STATUS(signalingClientGetIceConfigInfoCount(pSampleConfiguration->signalingClientHandle, &iceConfigCount)); /* signalingClientGetIceConfigInfoCount can return more than one turn server. Use only one to optimize * candidate gathering latency. But user can also choose to use more than 1 turn server. */ for (uriCount = 0, i = 0; i < maxTurnServer; i++) { CHK_STATUS(signalingClientGetIceConfigInfo(pSampleConfiguration->signalingClientHandle, i, &pIceConfigInfo)); for (j = 0; j < pIceConfigInfo->uriCount; j++) { CHECK(uriCount < MAX_ICE_SERVERS_COUNT); /* * if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=udp" then ICE will try TURN over UDP * if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS * if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=udp", it's currently ignored because sdk dont do TURN * over DTLS yet. if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS * if configuration.iceServers[uriCount + 1].urls is "turn:ip:port" then ICE will try both TURN over UPD and TCP/TLS * * It's recommended to not pass too many TURN iceServers to configuration because it will slow down ice gathering in non-trickle mode. */ STRNCPY(configuration.iceServers[uriCount + 1].urls, pIceConfigInfo->uris[j], MAX_ICE_CONFIG_URI_LEN); STRNCPY(configuration.iceServers[uriCount + 1].credential, pIceConfigInfo->password, MAX_ICE_CONFIG_CREDENTIAL_LEN); STRNCPY(configuration.iceServers[uriCount + 1].username, pIceConfigInfo->userName, MAX_ICE_CONFIG_USER_NAME_LEN); uriCount++; } } } pSampleConfiguration->iceUriCount = uriCount + 1; // Check if we have any pregenerated certs and use them // NOTE: We are running under the config lock retStatus = stackQueueDequeue(pSampleConfiguration->pregeneratedCertificates, &data); CHK(retStatus == STATUS_SUCCESS || retStatus == STATUS_NOT_FOUND, retStatus); if (retStatus == STATUS_NOT_FOUND) { retStatus = STATUS_SUCCESS; } else { // Use the pre-generated cert and get rid of it to not reuse again pRtcCertificate = (PRtcCertificate) data; configuration.certificates[0] = *pRtcCertificate; } curTime = GETTIME(); CHK_STATUS(createPeerConnection(&configuration, ppRtcPeerConnection)); DLOGD("time taken to create peer connection %" PRIu64 " ms", (GETTIME() - curTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND); CleanUp: CHK_LOG_ERR(retStatus); // Free the certificate which can be NULL as we no longer need it and won't reuse freeRtcCertificate(pRtcCertificate); LEAVES(); return retStatus; } // Return ICE server stats for a specific streaming session STATUS gatherIceServerStats(PSampleStreamingSession pSampleStreamingSession) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; RtcStats rtcmetrics; UINT32 j = 0; rtcmetrics.requestedTypeOfStats = RTC_STATS_TYPE_ICE_SERVER; for (; j < pSampleStreamingSession->pSampleConfiguration->iceUriCount; j++) { rtcmetrics.rtcStatsObject.iceServerStats.iceServerIndex = j; CHK_STATUS(rtcPeerConnectionGetMetrics(pSampleStreamingSession->pPeerConnection, NULL, &rtcmetrics)); DLOGD("ICE Server URL: %s", rtcmetrics.rtcStatsObject.iceServerStats.url); DLOGD("ICE Server port: %d", rtcmetrics.rtcStatsObject.iceServerStats.port); DLOGD("ICE Server protocol: %s", rtcmetrics.rtcStatsObject.iceServerStats.protocol); DLOGD("Total requests sent:%" PRIu64, rtcmetrics.rtcStatsObject.iceServerStats.totalRequestsSent); DLOGD("Total responses received: %" PRIu64, rtcmetrics.rtcStatsObject.iceServerStats.totalResponsesReceived); DLOGD("Total round trip time: %" PRIu64 "ms", rtcmetrics.rtcStatsObject.iceServerStats.totalRoundTripTime / HUNDREDS_OF_NANOS_IN_A_MILLISECOND); } CleanUp: LEAVES(); return retStatus; } STATUS createSampleStreamingSession(PSampleConfiguration pSampleConfiguration, PCHAR peerId, BOOL isMaster, PSampleStreamingSession* ppSampleStreamingSession) { STATUS retStatus = STATUS_SUCCESS; RtcMediaStreamTrack videoTrack, audioTrack; PSampleStreamingSession pSampleStreamingSession = NULL; RtcRtpTransceiverInit audioRtpTransceiverInit; RtcRtpTransceiverInit videoRtpTransceiverInit; MEMSET(&videoTrack, 0x00, SIZEOF(RtcMediaStreamTrack)); MEMSET(&audioTrack, 0x00, SIZEOF(RtcMediaStreamTrack)); CHK(pSampleConfiguration != NULL && ppSampleStreamingSession != NULL, STATUS_NULL_ARG); CHK((isMaster && peerId != NULL) || !isMaster, STATUS_INVALID_ARG); pSampleStreamingSession = (PSampleStreamingSession) MEMCALLOC(1, SIZEOF(SampleStreamingSession)); CHK(pSampleStreamingSession != NULL, STATUS_NOT_ENOUGH_MEMORY); if (isMaster) { STRCPY(pSampleStreamingSession->peerId, peerId); } else { STRCPY(pSampleStreamingSession->peerId, SAMPLE_VIEWER_CLIENT_ID); } ATOMIC_STORE_BOOL(&pSampleStreamingSession->peerIdReceived, TRUE); pSampleStreamingSession->pSampleConfiguration = pSampleConfiguration; pSampleStreamingSession->rtcMetricsHistory.prevTs = GETTIME(); // if we're the viewer, we control the trickle ice mode pSampleStreamingSession->remoteCanTrickleIce = !isMaster && pSampleConfiguration->trickleIce; ATOMIC_STORE_BOOL(&pSampleStreamingSession->terminateFlag, FALSE); ATOMIC_STORE_BOOL(&pSampleStreamingSession->candidateGatheringDone, FALSE); CHK_STATUS(initializePeerConnection(pSampleConfiguration, &pSampleStreamingSession->pPeerConnection)); CHK_STATUS(peerConnectionOnIceCandidate(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession, onIceCandidateHandler)); CHK_STATUS( peerConnectionOnConnectionStateChange(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession, onConnectionStateChange)); if (pSampleConfiguration->onDataChannel != NULL) { CHK_STATUS(peerConnectionOnDataChannel(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession, pSampleConfiguration->onDataChannel)); } // Declare that we support H264,Profile=42E01F,level-asymmetry-allowed=1,packetization-mode=1 and Opus CHK_STATUS(addSupportedCodec(pSampleStreamingSession->pPeerConnection, RTC_CODEC_H264_PROFILE_42E01F_LEVEL_ASYMMETRY_ALLOWED_PACKETIZATION_MODE)); CHK_STATUS(addSupportedCodec(pSampleStreamingSession->pPeerConnection, RTC_CODEC_OPUS)); // Add a SendRecv Transceiver of type video videoTrack.kind = MEDIA_STREAM_TRACK_KIND_VIDEO; videoTrack.codec = RTC_CODEC_H264_PROFILE_42E01F_LEVEL_ASYMMETRY_ALLOWED_PACKETIZATION_MODE; videoRtpTransceiverInit.direction = RTC_RTP_TRANSCEIVER_DIRECTION_SENDRECV; STRCPY(videoTrack.streamId, "myKvsVideoStream"); STRCPY(videoTrack.trackId, "myVideoTrack"); CHK_STATUS(addTransceiver(pSampleStreamingSession->pPeerConnection, &videoTrack, &videoRtpTransceiverInit, &pSampleStreamingSession->pVideoRtcRtpTransceiver)); CHK_STATUS(transceiverOnBandwidthEstimation(pSampleStreamingSession->pVideoRtcRtpTransceiver, (UINT64) pSampleStreamingSession, sampleBandwidthEstimationHandler)); // Add a SendRecv Transceiver of type video audioTrack.kind = MEDIA_STREAM_TRACK_KIND_AUDIO; audioTrack.codec = RTC_CODEC_OPUS; audioRtpTransceiverInit.direction = RTC_RTP_TRANSCEIVER_DIRECTION_SENDRECV; STRCPY(audioTrack.streamId, "myKvsVideoStream"); STRCPY(audioTrack.trackId, "myAudioTrack"); CHK_STATUS(addTransceiver(pSampleStreamingSession->pPeerConnection, &audioTrack, &audioRtpTransceiverInit, &pSampleStreamingSession->pAudioRtcRtpTransceiver)); CHK_STATUS(transceiverOnBandwidthEstimation(pSampleStreamingSession->pAudioRtcRtpTransceiver, (UINT64) pSampleStreamingSession, sampleBandwidthEstimationHandler)); // twcc bandwidth estimation CHK_STATUS(peerConnectionOnSenderBandwidthEstimation(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession, sampleSenderBandwidthEstimationHandler)); pSampleStreamingSession->firstFrame = TRUE; pSampleStreamingSession->startUpLatency = 0; CleanUp: if (STATUS_FAILED(retStatus) && pSampleStreamingSession != NULL) { freeSampleStreamingSession(&pSampleStreamingSession); pSampleStreamingSession = NULL; } if (ppSampleStreamingSession != NULL) { *ppSampleStreamingSession = pSampleStreamingSession; } return retStatus; } STATUS freeSampleStreamingSession(PSampleStreamingSession* ppSampleStreamingSession) { STATUS retStatus = STATUS_SUCCESS; PSampleStreamingSession pSampleStreamingSession = NULL; PSampleConfiguration pSampleConfiguration; CHK(ppSampleStreamingSession != NULL, STATUS_NULL_ARG); pSampleStreamingSession = *ppSampleStreamingSession; CHK(pSampleStreamingSession != NULL && pSampleStreamingSession->pSampleConfiguration != NULL, retStatus); pSampleConfiguration = pSampleStreamingSession->pSampleConfiguration; DLOGD("Freeing streaming session with peer id: %s ", pSampleStreamingSession->peerId); ATOMIC_STORE_BOOL(&pSampleStreamingSession->terminateFlag, TRUE); if (pSampleStreamingSession->shutdownCallback != NULL) { pSampleStreamingSession->shutdownCallback(pSampleStreamingSession->shutdownCallbackCustomData, pSampleStreamingSession); } if (IS_VALID_TID_VALUE(pSampleStreamingSession->receiveAudioVideoSenderTid)) { THREAD_JOIN(pSampleStreamingSession->receiveAudioVideoSenderTid, NULL); } // De-initialize the session stats timer if there are no active sessions // NOTE: we need to perform this under the lock which might be acquired by // the running thread but it's OK as it's re-entrant MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock); if (pSampleConfiguration->iceCandidatePairStatsTimerId != MAX_UINT32 && pSampleConfiguration->streamingSessionCount == 0 && pSampleConfiguration->iceCandidatePairStatsTimerId != MAX_UINT32) { CHK_LOG_ERR(timerQueueCancelTimer(pSampleConfiguration->timerQueueHandle, pSampleConfiguration->iceCandidatePairStatsTimerId, (UINT64) pSampleConfiguration)); pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32; } MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); CHK_LOG_ERR(closePeerConnection(pSampleStreamingSession->pPeerConnection)); CHK_LOG_ERR(freePeerConnection(&pSampleStreamingSession->pPeerConnection)); SAFE_MEMFREE(pSampleStreamingSession); CleanUp: CHK_LOG_ERR(retStatus); return retStatus; } STATUS streamingSessionOnShutdown(PSampleStreamingSession pSampleStreamingSession, UINT64 customData, StreamSessionShutdownCallback streamSessionShutdownCallback) { STATUS retStatus = STATUS_SUCCESS; CHK(pSampleStreamingSession != NULL && streamSessionShutdownCallback != NULL, STATUS_NULL_ARG); pSampleStreamingSession->shutdownCallbackCustomData = customData; pSampleStreamingSession->shutdownCallback = streamSessionShutdownCallback; CleanUp: return retStatus; } VOID sampleFrameHandler(UINT64 customData, PFrame pFrame) { UNUSED_PARAM(customData); DLOGV("Frame received. TrackId: %" PRIu64 ", Size: %u, Flags %u", pFrame->trackId, pFrame->size, pFrame->flags); PSampleStreamingSession pSampleStreamingSession = (PSampleStreamingSession) customData; if (pSampleStreamingSession->firstFrame) { pSampleStreamingSession->firstFrame = FALSE; pSampleStreamingSession->startUpLatency = (GETTIME() - pSampleStreamingSession->offerReceiveTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND; printf("Start up latency from offer to first frame: %" PRIu64 "ms\n", pSampleStreamingSession->startUpLatency); } } VOID sampleBandwidthEstimationHandler(UINT64 customData, DOUBLE maxiumBitrate) { UNUSED_PARAM(customData); DLOGV("received bitrate suggestion: %f", maxiumBitrate); } VOID sampleSenderBandwidthEstimationHandler(UINT64 customData, UINT32 txBytes, UINT32 rxBytes, UINT32 txPacketsCnt, UINT32 rxPacketsCnt, UINT64 duration) { UNUSED_PARAM(customData); UNUSED_PARAM(duration); UNUSED_PARAM(rxBytes); UNUSED_PARAM(txBytes); UINT32 lostPacketsCnt = txPacketsCnt - rxPacketsCnt; UINT32 percentLost = lostPacketsCnt * 100 / txPacketsCnt; UINT32 bitrate = 1024; if (percentLost < 2) { // increase encoder bitrate by 2 percent bitrate *= 1.02f; } else if (percentLost > 5) { // decrease encoder bitrate by packet loss percent bitrate *= (1.0f - percentLost / 100.0f); } // otherwise keep bitrate the same DLOGS("received sender bitrate estimation: suggested bitrate %u sent: %u bytes %u packets received: %u bytes %u packets in %lu msec, ", bitrate, txBytes, txPacketsCnt, rxBytes, rxPacketsCnt, duration / 10000ULL); } STATUS handleRemoteCandidate(PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pSignalingMessage) { STATUS retStatus = STATUS_SUCCESS; RtcIceCandidateInit iceCandidate; CHK(pSampleStreamingSession != NULL && pSignalingMessage != NULL, STATUS_NULL_ARG); CHK_STATUS(deserializeRtcIceCandidateInit(pSignalingMessage->payload, pSignalingMessage->payloadLen, &iceCandidate)); CHK_STATUS(addIceCandidate(pSampleStreamingSession->pPeerConnection, iceCandidate.candidate)); CleanUp: CHK_LOG_ERR(retStatus); return retStatus; } STATUS traverseDirectoryPEMFileScan(UINT64 customData, DIR_ENTRY_TYPES entryType, PCHAR fullPath, PCHAR fileName) { UNUSED_PARAM(entryType); UNUSED_PARAM(fullPath); PCHAR certName = (PCHAR) customData; UINT32 fileNameLen = STRLEN(fileName); if (fileNameLen > ARRAY_SIZE(CA_CERT_PEM_FILE_EXTENSION) + 1 && (STRCMPI(CA_CERT_PEM_FILE_EXTENSION, &fileName[fileNameLen - ARRAY_SIZE(CA_CERT_PEM_FILE_EXTENSION) + 1]) == 0)) { certName[0] = FPATHSEPARATOR; certName++; STRCPY(certName, fileName); } return STATUS_SUCCESS; } STATUS lookForSslCert(PSampleConfiguration* ppSampleConfiguration) { STATUS retStatus = STATUS_SUCCESS; struct stat pathStat; CHAR certName[MAX_PATH_LEN]; PSampleConfiguration pSampleConfiguration = *ppSampleConfiguration; MEMSET(certName, 0x0, ARRAY_SIZE(certName)); pSampleConfiguration->pCaCertPath = getenv(CACERT_PATH_ENV_VAR); // if ca cert path is not set from the environment, try to use the one that cmake detected if (pSampleConfiguration->pCaCertPath == NULL) { CHK_ERR(STRNLEN(DEFAULT_KVS_CACERT_PATH, MAX_PATH_LEN) > 0, STATUS_INVALID_OPERATION, "No ca cert path given (error:%s)", strerror(errno)); pSampleConfiguration->pCaCertPath = DEFAULT_KVS_CACERT_PATH; } else { // Check if the environment variable is a path CHK(0 == FSTAT(pSampleConfiguration->pCaCertPath, &pathStat), STATUS_DIRECTORY_ENTRY_STAT_ERROR); if (S_ISDIR(pathStat.st_mode)) { CHK_STATUS(traverseDirectory(pSampleConfiguration->pCaCertPath, (UINT64) &certName, /* iterate */ FALSE, traverseDirectoryPEMFileScan)); if (certName[0] != 0x0) { STRCAT(pSampleConfiguration->pCaCertPath, certName); } else { DLOGW("Cert not found in path set...checking if CMake detected a path\n"); CHK_ERR(STRNLEN(DEFAULT_KVS_CACERT_PATH, MAX_PATH_LEN) > 0, STATUS_INVALID_OPERATION, "No ca cert path given (error:%s)", strerror(errno)); DLOGI("CMake detected cert path\n"); pSampleConfiguration->pCaCertPath = DEFAULT_KVS_CACERT_PATH; } } } CleanUp: CHK_LOG_ERR(retStatus); return retStatus; } STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE roleType, BOOL trickleIce, BOOL useTurn, PSampleConfiguration* ppSampleConfiguration) { STATUS retStatus = STATUS_SUCCESS; PCHAR pAccessKey, pSecretKey, pSessionToken, pLogLevel; PSampleConfiguration pSampleConfiguration = NULL; UINT32 logLevel = LOG_LEVEL_DEBUG; CHK(ppSampleConfiguration != NULL, STATUS_NULL_ARG); CHK(NULL != (pSampleConfiguration = (PSampleConfiguration) MEMCALLOC(1, SIZEOF(SampleConfiguration))), STATUS_NOT_ENOUGH_MEMORY); #ifdef IOT_CORE_ENABLE_CREDENTIALS PCHAR pIotCoreCredentialEndPoint, pIotCoreCert, pIotCorePrivateKey, pIotCoreRoleAlias, pIotCoreThingName; CHK_ERR((pIotCoreCredentialEndPoint = getenv(IOT_CORE_CREDENTIAL_ENDPOINT)) != NULL, STATUS_INVALID_OPERATION, "AWS_IOT_CORE_CREDENTIAL_ENDPOINT must be set"); CHK_ERR((pIotCoreCert = getenv(IOT_CORE_CERT)) != NULL, STATUS_INVALID_OPERATION, "AWS_IOT_CORE_CERT must be set"); CHK_ERR((pIotCorePrivateKey = getenv(IOT_CORE_PRIVATE_KEY)) != NULL, STATUS_INVALID_OPERATION, "AWS_IOT_CORE_PRIVATE_KEY must be set"); CHK_ERR((pIotCoreRoleAlias = getenv(IOT_CORE_ROLE_ALIAS)) != NULL, STATUS_INVALID_OPERATION, "AWS_IOT_CORE_ROLE_ALIAS must be set"); #else CHK_ERR((pAccessKey = getenv(ACCESS_KEY_ENV_VAR)) != NULL, STATUS_INVALID_OPERATION, "AWS_ACCESS_KEY_ID must be set"); CHK_ERR((pSecretKey = getenv(SECRET_KEY_ENV_VAR)) != NULL, STATUS_INVALID_OPERATION, "AWS_SECRET_ACCESS_KEY must be set"); #endif pSessionToken = getenv(SESSION_TOKEN_ENV_VAR); pSampleConfiguration->enableFileLogging = FALSE; if (NULL != getenv(ENABLE_FILE_LOGGING)) { pSampleConfiguration->enableFileLogging = TRUE; } if ((pSampleConfiguration->channelInfo.pRegion = getenv(DEFAULT_REGION_ENV_VAR)) == NULL) { pSampleConfiguration->channelInfo.pRegion = DEFAULT_AWS_REGION; } CHK_STATUS(lookForSslCert(&pSampleConfiguration)); // Set the logger log level if (NULL == (pLogLevel = getenv(DEBUG_LOG_LEVEL_ENV_VAR)) || STATUS_SUCCESS != STRTOUI32(pLogLevel, NULL, 10, &logLevel) || logLevel < LOG_LEVEL_VERBOSE || logLevel > LOG_LEVEL_SILENT) { logLevel = LOG_LEVEL_WARN; } SET_LOGGER_LOG_LEVEL(logLevel); #ifdef IOT_CORE_ENABLE_CREDENTIALS CHK_STATUS(createLwsIotCredentialProvider(pIotCoreCredentialEndPoint, pIotCoreCert, pIotCorePrivateKey, pSampleConfiguration->pCaCertPath, pIotCoreRoleAlias, channelName, &pSampleConfiguration->pCredentialProvider)); #else CHK_STATUS( createStaticCredentialProvider(pAccessKey, 0, pSecretKey, 0, pSessionToken, 0, MAX_UINT64, &pSampleConfiguration->pCredentialProvider)); #endif pSampleConfiguration->mediaSenderTid = INVALID_TID_VALUE; pSampleConfiguration->signalingClientHandle = INVALID_SIGNALING_CLIENT_HANDLE_VALUE; pSampleConfiguration->sampleConfigurationObjLock = MUTEX_CREATE(TRUE); pSampleConfiguration->cvar = CVAR_CREATE(); pSampleConfiguration->streamingSessionListReadLock = MUTEX_CREATE(FALSE); pSampleConfiguration->signalingSendMessageLock = MUTEX_CREATE(FALSE); /* This is ignored for master. Master can extract the info from offer. Viewer has to know if peer can trickle or * not ahead of time. */ pSampleConfiguration->trickleIce = trickleIce; pSampleConfiguration->useTurn = useTurn; pSampleConfiguration->channelInfo.version = CHANNEL_INFO_CURRENT_VERSION; pSampleConfiguration->channelInfo.pChannelName = channelName; pSampleConfiguration->channelInfo.pKmsKeyId = NULL; pSampleConfiguration->channelInfo.tagCount = 0; pSampleConfiguration->channelInfo.pTags = NULL; pSampleConfiguration->channelInfo.channelType = SIGNALING_CHANNEL_TYPE_SINGLE_MASTER; pSampleConfiguration->channelInfo.channelRoleType = roleType; pSampleConfiguration->channelInfo.cachingPolicy = SIGNALING_API_CALL_CACHE_TYPE_FILE; pSampleConfiguration->channelInfo.cachingPeriod = SIGNALING_API_CALL_CACHE_TTL_SENTINEL_VALUE; pSampleConfiguration->channelInfo.asyncIceServerConfig = TRUE; // has no effect pSampleConfiguration->channelInfo.retry = TRUE; pSampleConfiguration->channelInfo.reconnect = TRUE; pSampleConfiguration->channelInfo.pCertPath = pSampleConfiguration->pCaCertPath; pSampleConfiguration->channelInfo.messageTtl = 0; // Default is 60 seconds pSampleConfiguration->signalingClientCallbacks.version = SIGNALING_CLIENT_CALLBACKS_CURRENT_VERSION; pSampleConfiguration->signalingClientCallbacks.errorReportFn = signalingClientError; pSampleConfiguration->signalingClientCallbacks.stateChangeFn = signalingClientStateChanged; pSampleConfiguration->signalingClientCallbacks.customData = (UINT64) pSampleConfiguration; pSampleConfiguration->clientInfo.version = SIGNALING_CLIENT_INFO_CURRENT_VERSION; pSampleConfiguration->clientInfo.loggingLevel = logLevel; pSampleConfiguration->clientInfo.cacheFilePath = NULL; // Use the default path pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32; pSampleConfiguration->pregenerateCertTimerId = MAX_UINT32; ATOMIC_STORE_BOOL(&pSampleConfiguration->interrupted, FALSE); ATOMIC_STORE_BOOL(&pSampleConfiguration->mediaThreadStarted, FALSE); ATOMIC_STORE_BOOL(&pSampleConfiguration->appTerminateFlag, FALSE); ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, FALSE); ATOMIC_STORE_BOOL(&pSampleConfiguration->connected, FALSE); CHK_STATUS(timerQueueCreate(&pSampleConfiguration->timerQueueHandle)); CHK_STATUS(stackQueueCreate(&pSampleConfiguration->pregeneratedCertificates)); // Start the cert pre-gen timer callback if (SAMPLE_PRE_GENERATE_CERT) { CHK_LOG_ERR(retStatus = timerQueueAddTimer(pSampleConfiguration->timerQueueHandle, 0, SAMPLE_PRE_GENERATE_CERT_PERIOD, pregenerateCertTimerCallback, (UINT64) pSampleConfiguration, &pSampleConfiguration->pregenerateCertTimerId)); } pSampleConfiguration->iceUriCount = 0; CHK_STATUS(stackQueueCreate(&pSampleConfiguration->pPendingSignalingMessageForRemoteClient)); CHK_STATUS(hashTableCreateWithParams(SAMPLE_HASH_TABLE_BUCKET_COUNT, SAMPLE_HASH_TABLE_BUCKET_LENGTH, &pSampleConfiguration->pRtcPeerConnectionForRemoteClient)); CleanUp: if (STATUS_FAILED(retStatus)) { freeSampleConfiguration(&pSampleConfiguration); } if (ppSampleConfiguration != NULL) { *ppSampleConfiguration = pSampleConfiguration; } return retStatus; } STATUS logSignalingClientStats(PSignalingClientMetrics pSignalingClientMetrics) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; CHK(pSignalingClientMetrics != NULL, STATUS_NULL_ARG); DLOGD("Signaling client connection duration: %" PRIu64 " ms", (pSignalingClientMetrics->signalingClientStats.connectionDuration / HUNDREDS_OF_NANOS_IN_A_MILLISECOND)); DLOGD("Number of signaling client API errors: %d", pSignalingClientMetrics->signalingClientStats.numberOfErrors); DLOGD("Number of runtime errors in the session: %d", pSignalingClientMetrics->signalingClientStats.numberOfRuntimeErrors); DLOGD("Signaling client uptime: %" PRIu64 " ms", (pSignalingClientMetrics->signalingClientStats.connectionDuration / HUNDREDS_OF_NANOS_IN_A_MILLISECOND)); // This gives the EMA of the createChannel, describeChannel, getChannelEndpoint and deleteChannel calls DLOGD("Control Plane API call latency: %" PRIu64 " ms", (pSignalingClientMetrics->signalingClientStats.cpApiCallLatency / HUNDREDS_OF_NANOS_IN_A_MILLISECOND)); // This gives the EMA of the getIceConfig() call. DLOGD("Data Plane API call latency: %" PRIu64 " ms", (pSignalingClientMetrics->signalingClientStats.dpApiCallLatency / HUNDREDS_OF_NANOS_IN_A_MILLISECOND)); CleanUp: LEAVES(); return retStatus; } STATUS getIceCandidatePairStatsCallback(UINT32 timerId, UINT64 currentTime, UINT64 customData) { UNUSED_PARAM(timerId); UNUSED_PARAM(currentTime); STATUS retStatus = STATUS_SUCCESS; PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData; UINT32 i; UINT64 currentMeasureDuration = 0; DOUBLE averagePacketsDiscardedOnSend = 0.0; DOUBLE averageNumberOfPacketsSentPerSecond = 0.0; DOUBLE averageNumberOfPacketsReceivedPerSecond = 0.0; DOUBLE outgoingBitrate = 0.0; DOUBLE incomingBitrate = 0.0; BOOL locked = FALSE; CHK_WARN(pSampleConfiguration != NULL, STATUS_NULL_ARG, "[KVS Master] getPeriodicStats(): Passed argument is NULL"); pSampleConfiguration->rtcIceCandidatePairMetrics.requestedTypeOfStats = RTC_STATS_TYPE_CANDIDATE_PAIR; // We need to execute this under the object lock due to race conditions that it could pose MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock); locked = TRUE; for (i = 0; i < pSampleConfiguration->streamingSessionCount; ++i) { if (STATUS_SUCCEEDED(rtcPeerConnectionGetMetrics(pSampleConfiguration->sampleStreamingSessionList[i]->pPeerConnection, NULL, &pSampleConfiguration->rtcIceCandidatePairMetrics))) { currentMeasureDuration = (pSampleConfiguration->rtcIceCandidatePairMetrics.timestamp - pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevTs) / HUNDREDS_OF_NANOS_IN_A_SECOND; DLOGD("Current duration: %" PRIu64 " seconds", currentMeasureDuration); if (currentMeasureDuration > 0) { DLOGD("Selected local candidate ID: %s", pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.localCandidateId); DLOGD("Selected remote candidate ID: %s", pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.remoteCandidateId); // TODO: Display state as a string for readability DLOGD("Ice Candidate Pair state: %d", pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.state); DLOGD("Nomination state: %s", pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.nominated ? "nominated" : "not nominated"); averageNumberOfPacketsSentPerSecond = (DOUBLE)(pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsSent - pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsSent) / (DOUBLE) currentMeasureDuration; DLOGD("Packet send rate: %lf pkts/sec", averageNumberOfPacketsSentPerSecond); averageNumberOfPacketsReceivedPerSecond = (DOUBLE)(pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsReceived - pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsReceived) / (DOUBLE) currentMeasureDuration; DLOGD("Packet receive rate: %lf pkts/sec", averageNumberOfPacketsReceivedPerSecond); outgoingBitrate = (DOUBLE)((pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesSent - pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesSent) * 8.0) / currentMeasureDuration; DLOGD("Outgoing bit rate: %lf bps", outgoingBitrate); incomingBitrate = (DOUBLE)((pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesReceived - pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesReceived) * 8.0) / currentMeasureDuration; DLOGD("Incoming bit rate: %lf bps", incomingBitrate); averagePacketsDiscardedOnSend = (DOUBLE)(pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsDiscardedOnSend - pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevPacketsDiscardedOnSend) / (DOUBLE) currentMeasureDuration; DLOGD("Packet discard rate: %lf pkts/sec", averagePacketsDiscardedOnSend); DLOGD("Current STUN request round trip time: %lf sec", pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.currentRoundTripTime); DLOGD("Number of STUN responses received: %llu", pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.responsesReceived); pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevTs = pSampleConfiguration->rtcIceCandidatePairMetrics.timestamp; pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsSent = pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsSent; pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsReceived = pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsReceived; pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesSent = pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesSent; pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesReceived = pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesReceived; pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevPacketsDiscardedOnSend = pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsDiscardedOnSend; } } } CleanUp: if (locked) { MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); } return retStatus; } STATUS pregenerateCertTimerCallback(UINT32 timerId, UINT64 currentTime, UINT64 customData) { UNUSED_PARAM(timerId); UNUSED_PARAM(currentTime); STATUS retStatus = STATUS_SUCCESS; PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData; BOOL locked = FALSE; UINT32 certCount; PRtcCertificate pRtcCertificate = NULL; CHK_WARN(pSampleConfiguration != NULL, STATUS_NULL_ARG, "[KVS Master] pregenerateCertTimerCallback(): Passed argument is NULL"); MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock); locked = TRUE; // Quick check if there is anything that needs to be done. CHK_STATUS(stackQueueGetCount(pSampleConfiguration->pregeneratedCertificates, &certCount)); CHK(certCount != MAX_RTCCONFIGURATION_CERTIFICATES, retStatus); // Generate the certificate with the keypair CHK_STATUS(createRtcCertificate(&pRtcCertificate)); // Add to the stack queue CHK_STATUS(stackQueueEnqueue(pSampleConfiguration->pregeneratedCertificates, (UINT64) pRtcCertificate)); DLOGV("New certificate has been pre-generated and added to the queue"); // Reset it so it won't be freed on exit pRtcCertificate = NULL; MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); locked = FALSE; CleanUp: if (pRtcCertificate != NULL) { freeRtcCertificate(pRtcCertificate); } if (locked) { MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); } return retStatus; } STATUS freeSampleConfiguration(PSampleConfiguration* ppSampleConfiguration) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; PSampleConfiguration pSampleConfiguration; UINT32 i; UINT64 data; StackQueueIterator iterator; BOOL locked = FALSE; CHK(ppSampleConfiguration != NULL, STATUS_NULL_ARG); pSampleConfiguration = *ppSampleConfiguration; CHK(pSampleConfiguration != NULL, retStatus); if (pSampleConfiguration->pPendingSignalingMessageForRemoteClient != NULL) { // Iterate and free all the pending queues stackQueueGetIterator(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, &iterator); while (IS_VALID_ITERATOR(iterator)) { stackQueueIteratorGetItem(iterator, &data); stackQueueIteratorNext(&iterator); freeMessageQueue((PPendingMessageQueue) data); } stackQueueClear(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, FALSE); stackQueueFree(pSampleConfiguration->pPendingSignalingMessageForRemoteClient); pSampleConfiguration->pPendingSignalingMessageForRemoteClient = NULL; } hashTableClear(pSampleConfiguration->pRtcPeerConnectionForRemoteClient); hashTableFree(pSampleConfiguration->pRtcPeerConnectionForRemoteClient); if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) { MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock); locked = TRUE; } for (i = 0; i < pSampleConfiguration->streamingSessionCount; ++i) { retStatus = gatherIceServerStats(pSampleConfiguration->sampleStreamingSessionList[i]); if (STATUS_FAILED(retStatus)) { DLOGW("Failed to ICE Server Stats for streaming session %d: %08x", i, retStatus); } freeSampleStreamingSession(&pSampleConfiguration->sampleStreamingSessionList[i]); } if (locked) { MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); } deinitKvsWebRtc(); SAFE_MEMFREE(pSampleConfiguration->pVideoFrameBuffer); SAFE_MEMFREE(pSampleConfiguration->pAudioFrameBuffer); if (IS_VALID_CVAR_VALUE(pSampleConfiguration->cvar) && IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) { CVAR_BROADCAST(pSampleConfiguration->cvar); MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock); MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); } if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) { MUTEX_FREE(pSampleConfiguration->sampleConfigurationObjLock); } if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->streamingSessionListReadLock)) { MUTEX_FREE(pSampleConfiguration->streamingSessionListReadLock); } if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->signalingSendMessageLock)) { MUTEX_FREE(pSampleConfiguration->signalingSendMessageLock); } if (IS_VALID_CVAR_VALUE(pSampleConfiguration->cvar)) { CVAR_FREE(pSampleConfiguration->cvar); } #ifdef IOT_CORE_ENABLE_CREDENTIALS freeIotCredentialProvider(&pSampleConfiguration->pCredentialProvider); #else freeStaticCredentialProvider(&pSampleConfiguration->pCredentialProvider); #endif if (IS_VALID_TIMER_QUEUE_HANDLE(pSampleConfiguration->timerQueueHandle)) { if (pSampleConfiguration->iceCandidatePairStatsTimerId != MAX_UINT32) { retStatus = timerQueueCancelTimer(pSampleConfiguration->timerQueueHandle, pSampleConfiguration->iceCandidatePairStatsTimerId, (UINT64) pSampleConfiguration); if (STATUS_FAILED(retStatus)) { DLOGE("Failed to cancel stats timer with: 0x%08x", retStatus); } pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32; } if (pSampleConfiguration->pregenerateCertTimerId != MAX_UINT32) { retStatus = timerQueueCancelTimer(pSampleConfiguration->timerQueueHandle, pSampleConfiguration->pregenerateCertTimerId, (UINT64) pSampleConfiguration); if (STATUS_FAILED(retStatus)) { DLOGE("Failed to cancel certificate pre-generation timer with: 0x%08x", retStatus); } pSampleConfiguration->pregenerateCertTimerId = MAX_UINT32; } timerQueueFree(&pSampleConfiguration->timerQueueHandle); } if (pSampleConfiguration->pregeneratedCertificates != NULL) { stackQueueGetIterator(pSampleConfiguration->pregeneratedCertificates, &iterator); while (IS_VALID_ITERATOR(iterator)) { stackQueueIteratorGetItem(iterator, &data); stackQueueIteratorNext(&iterator); freeRtcCertificate((PRtcCertificate) data); } CHK_LOG_ERR(stackQueueClear(pSampleConfiguration->pregeneratedCertificates, FALSE)); CHK_LOG_ERR(stackQueueFree(pSampleConfiguration->pregeneratedCertificates)); pSampleConfiguration->pregeneratedCertificates = NULL; } MEMFREE(*ppSampleConfiguration); *ppSampleConfiguration = NULL; CleanUp: LEAVES(); return retStatus; } STATUS sessionCleanupWait(PSampleConfiguration pSampleConfiguration) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; PSampleStreamingSession pSampleStreamingSession = NULL; UINT32 i, clientIdHash; BOOL locked = FALSE, peerConnectionFound = FALSE; SIGNALING_CLIENT_STATE signalingClientState; CHK(pSampleConfiguration != NULL, STATUS_NULL_ARG); while (!ATOMIC_LOAD_BOOL(&pSampleConfiguration->interrupted)) { // Keep the main set of operations interlocked until cvar wait which would atomically unlock MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock); locked = TRUE; // scan and cleanup terminated streaming session for (i = 0; i < pSampleConfiguration->streamingSessionCount; ++i) { if (ATOMIC_LOAD_BOOL(&pSampleConfiguration->sampleStreamingSessionList[i]->terminateFlag)) { pSampleStreamingSession = pSampleConfiguration->sampleStreamingSessionList[i]; MUTEX_LOCK(pSampleConfiguration->streamingSessionListReadLock); // swap with last element and decrement count pSampleConfiguration->streamingSessionCount--; pSampleConfiguration->sampleStreamingSessionList[i] = pSampleConfiguration->sampleStreamingSessionList[pSampleConfiguration->streamingSessionCount]; // Remove from the hash table clientIdHash = COMPUTE_CRC32((PBYTE) pSampleStreamingSession->peerId, (UINT32) STRLEN(pSampleStreamingSession->peerId)); CHK_STATUS(hashTableContains(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, &peerConnectionFound)); if (peerConnectionFound) { CHK_STATUS(hashTableRemove(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash)); } MUTEX_UNLOCK(pSampleConfiguration->streamingSessionListReadLock); CHK_STATUS(freeSampleStreamingSession(&pSampleStreamingSession)); } } // Check if we need to re-create the signaling client on-the-fly if (ATOMIC_LOAD_BOOL(&pSampleConfiguration->recreateSignalingClient) && STATUS_SUCCEEDED(freeSignalingClient(&pSampleConfiguration->signalingClientHandle)) && STATUS_SUCCEEDED(createSignalingClientSync(&pSampleConfiguration->clientInfo, &pSampleConfiguration->channelInfo, &pSampleConfiguration->signalingClientCallbacks, pSampleConfiguration->pCredentialProvider, &pSampleConfiguration->signalingClientHandle))) { // Re-set the variable again ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, FALSE); } // Check the signaling client state and connect if needed if (IS_VALID_SIGNALING_CLIENT_HANDLE(pSampleConfiguration->signalingClientHandle)) { CHK_STATUS(signalingClientGetCurrentState(pSampleConfiguration->signalingClientHandle, &signalingClientState)); if (signalingClientState == SIGNALING_CLIENT_STATE_READY) { UNUSED_PARAM(signalingClientConnectSync(pSampleConfiguration->signalingClientHandle)); } } // Check if any lingering pending message queues CHK_STATUS(removeExpiredMessageQueues(pSampleConfiguration->pPendingSignalingMessageForRemoteClient)); // periodically wake up and clean up terminated streaming session CVAR_WAIT(pSampleConfiguration->cvar, pSampleConfiguration->sampleConfigurationObjLock, SAMPLE_SESSION_CLEANUP_WAIT_PERIOD); MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); locked = FALSE; } CleanUp: CHK_LOG_ERR(retStatus); if (locked) { MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); } LEAVES(); return retStatus; } STATUS submitPendingIceCandidate(PPendingMessageQueue pPendingMessageQueue, PSampleStreamingSession pSampleStreamingSession) { STATUS retStatus = STATUS_SUCCESS; BOOL noPendingSignalingMessageForClient = FALSE; PReceivedSignalingMessage pReceivedSignalingMessage = NULL; UINT64 hashValue; CHK(pPendingMessageQueue != NULL && pPendingMessageQueue->messageQueue != NULL && pSampleStreamingSession != NULL, STATUS_NULL_ARG); do { CHK_STATUS(stackQueueIsEmpty(pPendingMessageQueue->messageQueue, &noPendingSignalingMessageForClient)); if (!noPendingSignalingMessageForClient) { hashValue = 0; CHK_STATUS(stackQueueDequeue(pPendingMessageQueue->messageQueue, &hashValue)); pReceivedSignalingMessage = (PReceivedSignalingMessage) hashValue; CHK(pReceivedSignalingMessage != NULL, STATUS_INTERNAL_ERROR); if (pReceivedSignalingMessage->signalingMessage.messageType == SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE) { CHK_STATUS(handleRemoteCandidate(pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage)); } SAFE_MEMFREE(pReceivedSignalingMessage); } } while (!noPendingSignalingMessageForClient); CHK_STATUS(freeMessageQueue(pPendingMessageQueue)); CleanUp: SAFE_MEMFREE(pReceivedSignalingMessage); CHK_LOG_ERR(retStatus); return retStatus; } STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pReceivedSignalingMessage) { STATUS retStatus = STATUS_SUCCESS; PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData; BOOL peerConnectionFound = FALSE, locked = TRUE, startStats = FALSE; UINT32 clientIdHash; UINT64 hashValue = 0; PPendingMessageQueue pPendingMessageQueue = NULL; PSampleStreamingSession pSampleStreamingSession = NULL; PReceivedSignalingMessage pReceivedSignalingMessageCopy = NULL; CHK(pSampleConfiguration != NULL, STATUS_NULL_ARG); MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock); locked = TRUE; clientIdHash = COMPUTE_CRC32((PBYTE) pReceivedSignalingMessage->signalingMessage.peerClientId, (UINT32) STRLEN(pReceivedSignalingMessage->signalingMessage.peerClientId)); CHK_STATUS(hashTableContains(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, &peerConnectionFound)); if (peerConnectionFound) { CHK_STATUS(hashTableGet(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, &hashValue)); pSampleStreamingSession = (PSampleStreamingSession) hashValue; } switch (pReceivedSignalingMessage->signalingMessage.messageType) { case SIGNALING_MESSAGE_TYPE_OFFER: // Check if we already have an ongoing master session with the same peer CHK_ERR(!peerConnectionFound, STATUS_INVALID_OPERATION, "Peer connection %s is in progress", pReceivedSignalingMessage->signalingMessage.peerClientId); /* * Create new streaming session for each offer, then insert the client id and streaming session into * pRtcPeerConnectionForRemoteClient for subsequent ice candidate messages. Lastly check if there is * any ice candidate messages queued in pPendingSignalingMessageForRemoteClient. If so then submit * all of them. */ if (pSampleConfiguration->streamingSessionCount == ARRAY_SIZE(pSampleConfiguration->sampleStreamingSessionList)) { DLOGW("Max simultaneous streaming session count reached."); // Need to remove the pending queue if any. // This is a simple optimization as the session cleanup will // handle the cleanup of pending message queue after a while CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, &pPendingMessageQueue)); CHK(FALSE, retStatus); } CHK_STATUS(createSampleStreamingSession(pSampleConfiguration, pReceivedSignalingMessage->signalingMessage.peerClientId, TRUE, &pSampleStreamingSession)); pSampleStreamingSession->offerReceiveTime = GETTIME(); MUTEX_LOCK(pSampleConfiguration->streamingSessionListReadLock); pSampleConfiguration->sampleStreamingSessionList[pSampleConfiguration->streamingSessionCount++] = pSampleStreamingSession; MUTEX_UNLOCK(pSampleConfiguration->streamingSessionListReadLock); CHK_STATUS(handleOffer(pSampleConfiguration, pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage)); CHK_STATUS(hashTablePut(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pSampleStreamingSession)); // If there are any ice candidate messages in the queue for this client id, submit them now. CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, &pPendingMessageQueue)); if (pPendingMessageQueue != NULL) { CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pSampleStreamingSession)); // NULL the pointer to avoid it being freed in the cleanup pPendingMessageQueue = NULL; } startStats = pSampleConfiguration->iceCandidatePairStatsTimerId == MAX_UINT32; break; case SIGNALING_MESSAGE_TYPE_ANSWER: /* * for viewer, pSampleStreamingSession should've already been created. insert the client id and * streaming session into pRtcPeerConnectionForRemoteClient for subsequent ice candidate messages. * Lastly check if there is any ice candidate messages queued in pPendingSignalingMessageForRemoteClient. * If so then submit all of them. */ pSampleStreamingSession = pSampleConfiguration->sampleStreamingSessionList[0]; CHK_STATUS(handleAnswer(pSampleConfiguration, pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage)); CHK_STATUS(hashTablePut(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pSampleStreamingSession)); // If there are any ice candidate messages in the queue for this client id, submit them now. CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, &pPendingMessageQueue)); if (pPendingMessageQueue != NULL) { CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pSampleStreamingSession)); // NULL the pointer to avoid it being freed in the cleanup pPendingMessageQueue = NULL; } break; case SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE: /* * if peer connection hasn't been created, create an queue to store the ice candidate message. Otherwise * submit the signaling message into the corresponding streaming session. */ if (!peerConnectionFound) { CHK_STATUS(getPendingMessageQueueForHash(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, FALSE, &pPendingMessageQueue)); if (pPendingMessageQueue == NULL) { CHK_STATUS(createMessageQueue(clientIdHash, &pPendingMessageQueue)); CHK_STATUS(stackQueueEnqueue(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, (UINT64) pPendingMessageQueue)); } pReceivedSignalingMessageCopy = (PReceivedSignalingMessage) MEMCALLOC(1, SIZEOF(ReceivedSignalingMessage)); *pReceivedSignalingMessageCopy = *pReceivedSignalingMessage; CHK_STATUS(stackQueueEnqueue(pPendingMessageQueue->messageQueue, (UINT64) pReceivedSignalingMessageCopy)); // NULL the pointers to not free any longer pPendingMessageQueue = NULL; pReceivedSignalingMessageCopy = NULL; } else { CHK_STATUS(handleRemoteCandidate(pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage)); } break; default: DLOGD("Unhandled signaling message type %u", pReceivedSignalingMessage->signalingMessage.messageType); break; } MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); locked = FALSE; if (startStats && STATUS_FAILED(retStatus = timerQueueAddTimer(pSampleConfiguration->timerQueueHandle, SAMPLE_STATS_DURATION, SAMPLE_STATS_DURATION, getIceCandidatePairStatsCallback, (UINT64) pSampleConfiguration, &pSampleConfiguration->iceCandidatePairStatsTimerId))) { DLOGW("Failed to add getIceCandidatePairStatsCallback to add to timer queue (code 0x%08x). " "Cannot pull ice candidate pair metrics periodically", retStatus); // Reset the returned status retStatus = STATUS_SUCCESS; } CleanUp: SAFE_MEMFREE(pReceivedSignalingMessageCopy); if (pPendingMessageQueue != NULL) { freeMessageQueue(pPendingMessageQueue); } if (locked) { MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock); } CHK_LOG_ERR(retStatus); return retStatus; } STATUS createMessageQueue(UINT64 hashValue, PPendingMessageQueue* ppPendingMessageQueue) { STATUS retStatus = STATUS_SUCCESS; PPendingMessageQueue pPendingMessageQueue = NULL; CHK(ppPendingMessageQueue != NULL, STATUS_NULL_ARG); CHK(NULL != (pPendingMessageQueue = (PPendingMessageQueue) MEMCALLOC(1, SIZEOF(PendingMessageQueue))), STATUS_NOT_ENOUGH_MEMORY); pPendingMessageQueue->hashValue = hashValue; pPendingMessageQueue->createTime = GETTIME(); CHK_STATUS(stackQueueCreate(&pPendingMessageQueue->messageQueue)); CleanUp: if (STATUS_FAILED(retStatus) && pPendingMessageQueue != NULL) { freeMessageQueue(pPendingMessageQueue); pPendingMessageQueue = NULL; } if (ppPendingMessageQueue != NULL) { *ppPendingMessageQueue = pPendingMessageQueue; } return retStatus; } STATUS freeMessageQueue(PPendingMessageQueue pPendingMessageQueue) { STATUS retStatus = STATUS_SUCCESS; // free is idempotent CHK(pPendingMessageQueue != NULL, retStatus); if (pPendingMessageQueue->messageQueue != NULL) { stackQueueClear(pPendingMessageQueue->messageQueue, TRUE); stackQueueFree(pPendingMessageQueue->messageQueue); } MEMFREE(pPendingMessageQueue); CleanUp: return retStatus; } STATUS getPendingMessageQueueForHash(PStackQueue pPendingQueue, UINT64 clientHash, BOOL remove, PPendingMessageQueue* ppPendingMessageQueue) { STATUS retStatus = STATUS_SUCCESS; PPendingMessageQueue pPendingMessageQueue = NULL; StackQueueIterator iterator; BOOL iterate = TRUE; UINT64 data; CHK(pPendingQueue != NULL && ppPendingMessageQueue != NULL, STATUS_NULL_ARG); CHK_STATUS(stackQueueGetIterator(pPendingQueue, &iterator)); while (iterate && IS_VALID_ITERATOR(iterator)) { CHK_STATUS(stackQueueIteratorGetItem(iterator, &data)); CHK_STATUS(stackQueueIteratorNext(&iterator)); pPendingMessageQueue = (PPendingMessageQueue) data; if (clientHash == pPendingMessageQueue->hashValue) { *ppPendingMessageQueue = pPendingMessageQueue; iterate = FALSE; // Check if the item needs to be removed if (remove) { // This is OK to do as we are terminating the iterator anyway CHK_STATUS(stackQueueRemoveItem(pPendingQueue, data)); } } } CleanUp: return retStatus; } STATUS removeExpiredMessageQueues(PStackQueue pPendingQueue) { STATUS retStatus = STATUS_SUCCESS; PPendingMessageQueue pPendingMessageQueue = NULL; UINT32 i, count; UINT64 data, curTime; CHK(pPendingQueue != NULL, STATUS_NULL_ARG); curTime = GETTIME(); CHK_STATUS(stackQueueGetCount(pPendingQueue, &count)); // Dequeue and enqueue in order to not break the iterator while removing an item for (i = 0; i < count; i++) { CHK_STATUS(stackQueueDequeue(pPendingQueue, &data)); // Check for expiry pPendingMessageQueue = (PPendingMessageQueue) data; if (pPendingMessageQueue->createTime + SAMPLE_PENDING_MESSAGE_CLEANUP_DURATION < curTime) { // Message queue has expired and needs to be freed CHK_STATUS(freeMessageQueue(pPendingMessageQueue)); } else { // Enqueue back again as it's still valued CHK_STATUS(stackQueueEnqueue(pPendingQueue, data)); } } CleanUp: return retStatus; }