// ------------------------------------------------------------------------------------------- // Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. // This file is part of the AWS CDI-SDK, licensed under the BSD 2-Clause "Simplified" License. // License details at: https://github.com/aws/aws-cdi-sdk/blob/mainline/LICENSE // ------------------------------------------------------------------------------------------- /** * @file * @brief * This file contains internal definitions and implementation used with the SDK that is not part of the API. */ // Include headers in the following order: Related header, C system headers, other libraries' headers, your project's // headers. #include "internal_tx.h" #include #include "cdi_queue_api.h" #include "endpoint_manager.h" #include "internal.h" #include "payload.h" #include "private.h" #include "statistics.h" //********************************************************************************************************************* //***************************************** START OF DEFINITIONS AND TYPES ******************************************** //********************************************************************************************************************* //********************************************************************************************************************* //*********************************************** START OF VARIABLES ************************************************** //********************************************************************************************************************* //********************************************************************************************************************* //******************************************* START OF STATIC FUNCTIONS *********************************************** //********************************************************************************************************************* /** * Return the next payload number to use for the specified connection. This is an incrementing value. * * @param endpoint_ptr Pointer to endpoint state data. * * @return The payload number. */ static uint16_t GetNextPayloadNum(CdiEndpointState* endpoint_ptr) { CdiOsCritSectionReserve(endpoint_ptr->tx_state.payload_num_lock); uint16_t payload_num = endpoint_ptr->tx_state.payload_num; if (++endpoint_ptr->tx_state.payload_num > endpoint_ptr->adapter_endpoint_ptr->protocol_handle->payload_num_max) { endpoint_ptr->tx_state.payload_num = 0; } CdiOsCritSectionRelease(endpoint_ptr->tx_state.payload_num_lock); return payload_num; } #ifdef DEBUG_TX_PACKET_SGL_ENTRIES /** * Dump Tx packet SGL entries to log or stdout. * * @param protocol_handle Protocol handle. * @param work_request_ptr Pointer to work request state data. */ static void DebugTxPacketSglEntries(CdiProtocolHandle protocol_handle, TxPacketWorkRequest* work_request_ptr) { CdiLogMultilineState m_state; CDI_LOG_THREAD_MULTILINE_BEGIN(kLogInfo, &m_state); CdiDecodedPacketHeader packet_header; CdiSglEntry sgl_entry = *work_request_ptr->packet.sg_list.sgl_head_ptr; ProtocolPayloadHeaderDecode(protocol_handle, sgl_entry.address_ptr, sgl_entry.size_in_bytes, &packet_header); // The payload_data_offset value is not used for packet sequence number zero, since the offset is always // zero. if (0 != packet_header.packet_sequence_num && kPayloadTypeDataOffset == packet_header.payload_type) { CDI_LOG_MULTILINE(&m_state, "Tx Total Packet Size[%d]. Packet Type[%d] Packet[%d/%d] Payload[%d] Offset[%d] Entries:", work_request_ptr->packet.sg_list.total_data_size, packet_header.payload_type, packet_header.packet_sequence_num, packet_header.packet_id, packet_header.payload_num, packet_header.data_offset_info.payload_data_offset); } else { CDI_LOG_MULTILINE(&m_state, "Tx Total Packet Size[%d]. Packet Type[%d] Packet[%d/%d] Payload[%d] Entries:", work_request_ptr->packet.sg_list.total_data_size, packet_header.payload_type, packet_header.packet_sequence_num, packet_header.packet_id, packet_header.payload_num); } CdiSglEntry *packet_entry_ptr = work_request_ptr->packet.sg_list.sgl_head_ptr; while (packet_entry_ptr) { CDI_LOG_MULTILINE(&m_state, "Size[%4d] Addr[%p]", packet_entry_ptr->size_in_bytes, packet_entry_ptr->address_ptr); packet_entry_ptr = packet_entry_ptr->next_ptr; } CDI_LOG_MULTILINE_END(&m_state); } #endif /** * @brief Return work request to pool. * * @param con_state_ptr Pointer to connection state structure. * @param work_request_ptr Pointer to work request structure. */ static void PutWorkRequestInPool(CdiConnectionState* con_state_ptr, TxPacketWorkRequest* work_request_ptr) { if (work_request_ptr->union_ptr) { // NOTE: This pool is not thread-safe, so must ensure that only one thread is accessing it at a time. CdiPoolPut(work_request_ptr->header_pool_handle, work_request_ptr->union_ptr); work_request_ptr->header_pool_handle = NULL; work_request_ptr->union_ptr = NULL; } // NOTE: This pool is not thread-safe, so must ensure that only one thread is accessing it at a time. CdiPoolPut(con_state_ptr->tx_state.work_request_pool_handle, work_request_ptr); } /** * Pop all items in the work request completion queue freeing resources associated with each one. * * @param con_state_ptr Pointer to connection state data. */ static void ProcessWorkRequestCompletionQueue(CdiConnectionState* con_state_ptr) { CdiSinglyLinkedList packet_list = { 0 }; while (CdiQueuePop(con_state_ptr->tx_state.work_req_comp_queue_handle, (void*)&packet_list)) { // Free resources used by the packets that are no longer needed. for (void* item_ptr = CdiSinglyLinkedListPopHead(&packet_list) ; NULL != item_ptr ; item_ptr = CdiSinglyLinkedListPopHead(&packet_list)) { Packet* packet_ptr = CONTAINER_OF(item_ptr, Packet, list_entry); TxPacketWorkRequest* work_request_ptr = (TxPacketWorkRequest*)packet_ptr->sg_list.internal_data_ptr; CdiSglEntry* packet_entry_hdr_ptr = work_request_ptr->packet.sg_list.sgl_head_ptr; #ifdef USE_MEMORY_POOL_APPENDED_LISTS // Since we used CdiPoolGetAppend(), all the pool entries are linked to the first entry and are freed with a // single call to CdiPoolPut(). CdiPoolPut(con_state_ptr->tx_state.packet_sgl_entry_pool_handle, packet_entry_hdr_ptr); #else // Put back SGL entry for each one in the list. FreeSglEntries(con_state_ptr->tx_state.packet_sgl_entry_pool_handle, packet_entry_hdr_ptr); #endif // Put back work request into the pool. PutWorkRequestInPool(con_state_ptr, work_request_ptr); work_request_ptr = NULL; // Pointer is no longer valid, so clear it. } } } /** * Payload thread used to transmit a payload. * * @param ptr Pointer to thread specific data. In this case, a pointer to CdiConnectionState. * * @return The return value is not used. */ static CDI_THREAD TxPayloadThread(void* ptr) { CdiConnectionState* con_state_ptr = (CdiConnectionState*)ptr; // Get a state tracker object for the packetizer. CdiPacketizerStateHandle packetizer_state_handle = PayloadPacketizerCreate(); if (NULL == packetizer_state_handle) { CDI_LOG_THREAD(kLogError, "Failed to create packetizer state."); return 0; } // Set this thread to use the connection's log. Can now use CDI_LOG_THREAD() for logging within this thread. CdiLoggerThreadLogSet(con_state_ptr->log_handle); EndpointManagerHandle mgr_handle = con_state_ptr->endpoint_manager_handle; // Register this thread with the Endpoint Manager as being part of this connection. CdiSignalType notification_signal = EndpointManagerThreadRegister(mgr_handle, CdiOsThreadGetName(con_state_ptr->payload_thread_id)); CdiSignalType comp_queue_signal = CdiQueueGetPopWaitSignal(con_state_ptr->tx_state.work_req_comp_queue_handle); CdiSignalType signal_array[2] = { notification_signal, comp_queue_signal }; // Packets are sent to the endpoint in batches starting with a single packet. The number is doubled with each // batch. This gives a quick start but as the queue backs up, the larger batch sizes lead to higher efficiency // per batch. int batch_size = 1; // These variable are used only within the scope of the while loop below but they must be declared outside of it // since their values need to start initialized but not get reinitialized every time through. bool last_packet = false; TxPacketWorkRequest* work_request_ptr = NULL; CdiSinglyLinkedList packet_list; CdiSinglyLinkedListInit(&packet_list); // The state machine goes through the states like: // // +-----> idle -+ // | | // | +-------+ // | | // | +-> work received ->+ // | | // | +-------------------+ // | | // | +->+-> get work request ->+ // | | | // | | +-------------------+ // | | | // | | +-> packetizing ->+ // | | | // | +<----------------------+ <-- list of packets to enqueue is incomplete // | ^ | // | | +--------------------+ <-- list of packets to enqueue is complete // | | | // | | +-> enqueueing ->+ // | | | // | +-------------------+ <-- not last packet of payload // | | // +----------------------+ <-- last packet of the payload has been successfully queued enum { kPayloadStateIdle, // No payload is in process: wait for payload from queue. kPayloadStateWorkReceived, // A payload was received to be transmitted: initialize for first packet. kPayloadStateGetWorkRequest, // Payload and packetizer initialized: get a work request from pool. kPayloadStatePacketizing, // Have work request: build SGL. kPayloadStateEnqueuing // Have completed list of work requests: queued to the adapter. } payload_processing_state = kPayloadStateIdle; // This loop should only block at the call to CdiQueuePopWaitMultiple(). If a pool runs dry or the output queue is // full, the logic inside of the loop should maintain enough state to suspend the process of packetizing the current // payload and resume when resources are available. TxPayloadState* payload_state_ptr = NULL; while (!CdiOsSignalGet(con_state_ptr->shutdown_signal) && !EndpointManagerIsConnectionShuttingDown(mgr_handle)) { uint32_t signal_index = 0; bool payload_received = false; if (kPayloadStateIdle == payload_processing_state) { // Wait for work from the payload queue, the work request complete queue, or a signal from the endpoint // manager. payload_received = CdiQueuePopWaitMultiple(con_state_ptr->tx_state.payload_queue_handle, CDI_INFINITE, signal_array, 2, &signal_index, (void**)&payload_state_ptr); } else { // A payload is currently in process. Wait for completion requests or a signal from the Endpoint Manager. CdiOsSignalsWait(signal_array, 2, false, CDI_INFINITE, &signal_index); } if (!payload_received) { // Either processing an existing payload or did not get a new one. Got a signal from either the Endpoint // Manager or work_req_comp_queue_handle (the queue contains data). if (0 == signal_index) { // Got a notification_signal. The endpoint state has changed, so wait until it has completed. EndpointManagerThreadWait(mgr_handle); // An Endpoint Manager state change means that Tx resources have been flushed or queued to be flushed, // including the current Tx payload that we could be processing. Reset our current payload state back to // idle. Allow the logic to drop below so if needed ProcessWorkRequestCompletionQueue() is invoked. payload_processing_state = kPayloadStateIdle; payload_state_ptr = NULL; } } else { payload_processing_state = kPayloadStateWorkReceived; // Increment reference counter once at the start of each payload. This will keep the PollThread() working as // long as we have payloads and their related packets to send. CdiOsAtomicInc32(&payload_state_ptr->cdi_endpoint_handle->adapter_endpoint_ptr->tx_in_flight_ref_count); CdiOsSignalSet(con_state_ptr->adapter_connection_ptr->tx_poll_do_work_signal); } // Always check the completion queue here. Don't want to starve it in case either several Endpoint Manager // notifications are received or the payload_queue_handle doesn't go empty. if (CdiOsSignalReadState(comp_queue_signal)) { ProcessWorkRequestCompletionQueue(con_state_ptr); } // Either resume work on a payload in progress or start a new one. if (kPayloadStateWorkReceived == payload_processing_state) { // No packet was in progress so start by initializing for the first one. // Increment payload number. NOTE: This is done here on the read side of the queue rather than on the write // side of the queue because the write side fails if the queue is full. This would cause payload_num to // increment erroneously. By incrementing here on the read side, this problem is avoided. payload_state_ptr->payload_packet_state.payload_num = GetNextPayloadNum(payload_state_ptr->cdi_endpoint_handle); if (CdiLogComponentIsEnabled(con_state_ptr, kLogComponentPayloadConfig)) { // Dump payload configuration to log or stdout. DumpPayloadConfiguration(&payload_state_ptr->app_payload_cb_data.core_extra_data, payload_state_ptr->app_payload_cb_data.extra_data_size, payload_state_ptr->app_payload_cb_data.extra_data_array, con_state_ptr->protocol_type); } // Prepare packetizer for first packet. PayloadPacketizerStateInit(packetizer_state_handle); CdiSinglyLinkedListInit(&packet_list); batch_size = 1; last_packet = false; payload_processing_state = kPayloadStateGetWorkRequest; // Advance the state machine. } bool keep_going = kPayloadStateGetWorkRequest == payload_processing_state || kPayloadStatePacketizing == payload_processing_state || kPayloadStateEnqueuing == payload_processing_state; while (keep_going) { // When the connection goes down, no need to use resources to continue creating packets or adding them to // the adapter's queue. If the adapter's queue gets full it will start generating queue full log message // errors. AdapterEndpointHandle adapter_endpoint_handle = EndpointManagerEndpointToAdapterEndpoint(payload_state_ptr->cdi_endpoint_handle); if (kCdiConnectionStatusConnected != adapter_endpoint_handle->connection_status_code) { break; } if (kPayloadStateGetWorkRequest == payload_processing_state) { // NOTE: This pool is not thread-safe, so must ensure that only one thread is accessing it at a time. if (!CdiPoolGet(con_state_ptr->tx_state.work_request_pool_handle, (void**)&work_request_ptr)) { keep_going = false; } else { // If first packet of a payload and uses extra data, use the extra data pool. if (0 == payload_state_ptr->payload_packet_state.packet_sequence_num && payload_state_ptr->app_payload_cb_data.extra_data_size) { work_request_ptr->header_pool_handle = con_state_ptr->adapter_connection_ptr->tx_extra_header_pool_handle; } else { work_request_ptr->header_pool_handle = con_state_ptr->adapter_connection_ptr->tx_header_pool_handle; } if (!CdiPoolGet(work_request_ptr->header_pool_handle, (void**)&work_request_ptr->union_ptr)) { keep_going = false; } payload_processing_state = kPayloadStatePacketizing; } } if (keep_going && kPayloadStatePacketizing == payload_processing_state) { // NOTE: These pools are not thread-safe, so must ensure that only one thread is accessing them at a // time. if (!PayloadPacketizerPacketGet(adapter_endpoint_handle->protocol_handle, packetizer_state_handle, (char*)work_request_ptr->union_ptr, CdiPoolGetItemSize(work_request_ptr->header_pool_handle), con_state_ptr->tx_state.packet_sgl_entry_pool_handle, payload_state_ptr, &work_request_ptr->packet.sg_list, &last_packet)) { // Pool is empty; suspend processing the payload for now, retry after resources are freed. keep_going = false; } else { #ifdef DEBUG_TX_PACKET_SGL_ENTRIES DebugTxPacketSglEntries(adapter_endpoint_handle->protocol_handle, work_request_ptr); #endif // Fill in the work request with the specifics of the packet. work_request_ptr->payload_state_ptr = payload_state_ptr; work_request_ptr->payload_num = payload_state_ptr->payload_packet_state.payload_num; work_request_ptr->packet_payload_size = payload_state_ptr->payload_packet_state.packet_payload_data_size; // This pointer will be used later by TxPacketWorkRequestComplete() to get access to // work_request_ptr (a pointer to a TxPacketWorkRequest structure). work_request_ptr->packet.sg_list.internal_data_ptr = work_request_ptr; // Set flag for last packet of the payload so ACKs received can keep track of the number of // in-flight payloads. work_request_ptr->packet.payload_last_packet = last_packet; // Add the packet to a list to be enqueued to the adapter. CdiSinglyLinkedListPushTail(&packet_list, &work_request_ptr->packet.list_entry); // Increment reference counter once for each packet. CdiOsAtomicInc32(&payload_state_ptr->cdi_endpoint_handle->adapter_endpoint_ptr->tx_in_flight_ref_count); payload_processing_state = (last_packet || CdiSinglyLinkedListSize(&packet_list) >= batch_size) ? kPayloadStateEnqueuing : kPayloadStateGetWorkRequest; } } if (kPayloadStateEnqueuing == payload_processing_state) { // Enqueue packets. packet_list is copied so it can simply be initialized here to start fresh. if (kCdiStatusOk != CdiAdapterEnqueueSendPackets( EndpointManagerEndpointToAdapterEndpoint(payload_state_ptr->cdi_endpoint_handle), &packet_list)) { keep_going = false; } else { CdiSinglyLinkedListInit(&packet_list); batch_size *= 2; if (last_packet) { // The last packet of the payload has been sent; reset to start a new one. payload_processing_state = kPayloadStateIdle; payload_state_ptr = NULL; keep_going = false; // Successfully put all packets for a payload into Tx queue, so reset the back pressure state. con_state_ptr->back_pressure_state = kCdiBackPressureNone; } else { payload_processing_state = kPayloadStateGetWorkRequest; } } } } } PayloadPacketizerDestroy(packetizer_state_handle); if (EndpointManagerIsConnectionShuttingDown(mgr_handle)) { // Since this thread was registered with the Endpoint Manager using EndpointManagerThreadRegister(), need to // wait for the Endpoint Manager to complete the shutdown. EndpointManagerThreadWait(mgr_handle); } return 0; // Return code not used. } /** * Free function for Tx packet work request pool item. * * @param context_ptr Unused, reserve for future use. * @param item_ptr Pointer to item being initialized. * * @return true always */ static bool TxPacketWorkRequestPoolItemFree(const void* context_ptr, void* item_ptr) { (void)context_ptr; // Not used. TxPacketWorkRequest* work_request_ptr = (TxPacketWorkRequest*)item_ptr; if (work_request_ptr->header_ptr) { CdiPoolPut(work_request_ptr->header_pool_handle, work_request_ptr->union_ptr); work_request_ptr->header_pool_handle = NULL; work_request_ptr->union_ptr = NULL; } return true; } /** * Create an instance of a connection. * * @param protocol_type Connection protocol type. * @param config_data_ptr Pointer to transmitter configuration data. * @param tx_cb_ptr Address of the user function to call whenever a payload being transmitted has been received by the * receiver. * @param ret_handle_ptr Pointer to returned connection handle. * * @return A value from the CdiReturnStatus enumeration. */ static CdiReturnStatus TxCreateConnection(CdiConnectionProtocolType protocol_type, CdiTxConfigData* config_data_ptr, CdiCallback tx_cb_ptr, CdiConnectionHandle* ret_handle_ptr) { CdiReturnStatus rs = kCdiStatusOk; int max_tx_payloads = CDI_MAX_SIMULTANEOUS_TX_PAYLOADS_PER_CONNECTION; // If max_simultaneous_tx_payloads has been set use that value otherwise use // MAX_SIMULTANEOUS_TX_PAYLOADS_PER_CONNECTION if (config_data_ptr->max_simultaneous_tx_payloads) { max_tx_payloads = config_data_ptr->max_simultaneous_tx_payloads; } int max_tx_payload_sgl_entries = CDI_MAX_SIMULTANEOUS_TX_PAYLOAD_SGL_ENTRIES_PER_CONNECTION; // If max_simultaneous_tx_payload_sgl_entries has been set use that value otherwise use // CDI_MAX_SIMULTANEOUS_TX_PAYLOAD_SGL_ENTRIES_PER_CONNECTION if (config_data_ptr->max_simultaneous_tx_payload_sgl_entries) { max_tx_payload_sgl_entries = config_data_ptr->max_simultaneous_tx_payload_sgl_entries; } CdiConnectionState* con_state_ptr = (CdiConnectionState*)CdiOsMemAllocZero(sizeof *con_state_ptr); if (con_state_ptr == NULL) { rs = kCdiStatusNotEnoughMemory; } if (kCdiStatusOk == rs) { con_state_ptr->adapter_state_ptr = (CdiAdapterState*)config_data_ptr->adapter_handle; con_state_ptr->handle_type = kHandleTypeTx; con_state_ptr->protocol_type = protocol_type; con_state_ptr->magic = kMagicConnection; // Make a copy of the configuration data. memcpy(&con_state_ptr->tx_state.config_data, config_data_ptr, sizeof *config_data_ptr); // Make a copy of configuration data strings and update the copy of the config data to use them. NOTE: The // connection_name_str is updated in logic below (see saved_connection_name_str). if (config_data_ptr->dest_ip_addr_str) { CdiOsStrCpy(con_state_ptr->tx_state.copy_dest_ip_addr_str, sizeof(con_state_ptr->tx_state.copy_dest_ip_addr_str), config_data_ptr->dest_ip_addr_str); con_state_ptr->tx_state.config_data.dest_ip_addr_str = con_state_ptr->tx_state.copy_dest_ip_addr_str; } // Save callback address. con_state_ptr->tx_state.cb_ptr = tx_cb_ptr; } // Now that we have a connection logger, we can use the CDI_LOG_HANDLE() macro to add log messages to it. Since this // thread is from the application, we cannot use the CDI_LOG_THEAD() macro. // This log will be used by all the threads created for this connection. if (kCdiStatusOk == rs) { if (kLogMethodFile == config_data_ptr->connection_log_method_data_ptr->log_method) { CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogInfo, "Setting log file[%s] for SDK Tx logging.", config_data_ptr->connection_log_method_data_ptr->log_filename_str); } if (!CdiLoggerCreateLog(cdi_global_context.logger_handle, con_state_ptr, config_data_ptr->connection_log_method_data_ptr, &con_state_ptr->log_handle)) { rs = kCdiStatusCreateLogFailed; } } // Copy the name for the connection from the config data or generate one. NOTE: Do this here, since other logic // below uses the saved name. if ((NULL == config_data_ptr->connection_name_str) || (0 == strlen(config_data_ptr->connection_name_str))) { if (NULL == config_data_ptr->dest_ip_addr_str) { snprintf(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str), "%s:%d", "unknown_ip", config_data_ptr->dest_port); } else { snprintf(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str), "%s:%d", config_data_ptr->dest_ip_addr_str, config_data_ptr->dest_port); } config_data_ptr->connection_name_str = con_state_ptr->saved_connection_name_str; CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Tx connection is unnamed. Created name[%s]", con_state_ptr->saved_connection_name_str); } else { CdiOsStrCpy(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str), config_data_ptr->connection_name_str); } // Update copy of config data to use the saved connection string. con_state_ptr->tx_state.config_data.connection_name_str = con_state_ptr->saved_connection_name_str; if (kCdiStatusOk == rs) { CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Creating Tx connection. Protocol[%s] Destination IP[%s] Destination Port[%d] Name[%s]", CdiUtilityKeyEnumToString(kKeyConnectionProtocolType, protocol_type), con_state_ptr->tx_state.config_data.dest_ip_addr_str, con_state_ptr->tx_state.config_data.dest_port, CdiGetEmptyStringIfNull(con_state_ptr->tx_state.config_data.connection_name_str)); } if (kCdiStatusOk == rs) { rs = ConnectionCommonResourcesCreate(con_state_ptr, config_data_ptr->stats_cb_ptr, config_data_ptr->stats_user_cb_param, &config_data_ptr->stats_config); } if (kCdiStatusOk == rs) { // Create queue used to hold Tx payload messages that are sent to the TxPayloadThread() thread. Depth must be // less than the number of TX payloads allowed per connection to allow for proper pushback and payload state // data management. if (!CdiQueueCreate("TxPayloadState queue Pointer", max_tx_payloads-1, CDI_FIXED_QUEUE_SIZE, CDI_FIXED_QUEUE_SIZE, sizeof(TxPayloadState*), kQueueSignalPopWait | kQueueMultipleWritersFlag, // Can use wait signal for pops (reads), // thread safe for multiple writers. &con_state_ptr->tx_state.payload_queue_handle)) { rs = kCdiStatusNotEnoughMemory; } } if (kCdiStatusOk == rs) { // Create worker thread. if (!CdiOsThreadCreate(TxPayloadThread, &con_state_ptr->payload_thread_id, "TxPayload", con_state_ptr, con_state_ptr->start_signal)) { rs = kCdiStatusFatal; } } // Create memory pools. NOTE: These pools do not use any resource locks and are therefore not thread-safe. // TxPayloadThread() is the only user of the pools, except when restarting/shutting down the connection which is // done by EndpointManagerThread() while TxPayloadThread() is blocked. if (kCdiStatusOk == rs) { if (!CdiPoolCreate("Connection Tx TxPacketWorkRequest Pool", MAX_TX_PACKET_WORK_REQUESTS_PER_CONNECTION, NO_GROW_COUNT, NO_GROW_COUNT, sizeof(TxPacketWorkRequest), false, // false= Not thread-safe (no resource locks) &con_state_ptr->tx_state.work_request_pool_handle)) { rs = kCdiStatusNotEnoughMemory; } } if (kCdiStatusOk == rs) { if (!CdiPoolCreate("Connection Tx CdiSglEntry Pool", TX_PACKET_SGL_ENTRY_SIZE_PER_CONNECTION, NO_GROW_SIZE, NO_GROW_COUNT, sizeof(CdiSglEntry), false, // false= Not thread-safe (no resource locks) &con_state_ptr->tx_state.packet_sgl_entry_pool_handle)) { rs = kCdiStatusNotEnoughMemory; } } if (kCdiStatusOk == rs) { // There is a limit on the number of simultaneous Tx payloads per connection, so don't allow this pool to grow. if (!CdiPoolCreate("Connection Tx Payload State Pool", max_tx_payloads, NO_GROW_SIZE, NO_GROW_COUNT, sizeof(TxPayloadState), true, // true= Is thread-safe. &con_state_ptr->tx_state.payload_state_pool_handle)) { rs = kCdiStatusNotEnoughMemory; } } if (kCdiStatusOk == rs) { if (!CdiPoolCreate("Connection Tx Payload CdiSglEntry Pool", max_tx_payload_sgl_entries, NO_GROW_SIZE, NO_GROW_COUNT, sizeof(CdiSglEntry), true, // true= Is thread-safe. &con_state_ptr->tx_state.payload_sgl_entry_pool_handle)) { rs = kCdiStatusNotEnoughMemory; } } if (kCdiStatusOk == rs) { if (!CdiQueueCreate("Connection Tx TxPacketWorkRequest* Queue", MAX_TX_PACKETS_PER_CONNECTION, TX_PACKET_POOL_SIZE_GROW, MAX_POOL_GROW_COUNT, sizeof(CdiSinglyLinkedList), kQueueSignalPopWait, // Make a blockable reader. &con_state_ptr->tx_state.work_req_comp_queue_handle)) { rs = kCdiStatusNotEnoughMemory; } } if (kCdiStatusOk == rs) { // Create a packet message thread that is used by both Tx and Rx connections. rs = ConnectionCommonPacketMessageThreadCreate(con_state_ptr, "Tx:PayloadMessage"); } if (kCdiStatusOk == rs) { CdiAdapterConnectionConfigData config_data = { .cdi_adapter_handle = con_state_ptr->adapter_state_ptr, .cdi_connection_handle = con_state_ptr, .endpoint_manager_handle = con_state_ptr->endpoint_manager_handle, .connection_cb_ptr = con_state_ptr->tx_state.config_data.connection_cb_ptr, .connection_user_cb_param = con_state_ptr->tx_state.config_data.connection_user_cb_param, .log_handle = con_state_ptr->log_handle, .shared_thread_id = config_data_ptr->shared_thread_id, .thread_core_num = config_data_ptr->thread_core_num, .direction = kEndpointDirectionSend, .port_number = con_state_ptr->tx_state.config_data.dest_port, .bind_ip_addr_str = con_state_ptr->tx_state.config_data.bind_ip_addr_str, // This endpoint is used for normal data transmission (not used for control). This means that the Endpoint // Manager is used for managing threads related to the connection. .data_type = kEndpointTypeData, }; if (kCdiStatusOk != CdiAdapterCreateConnection(&config_data, &con_state_ptr->adapter_connection_ptr)) { rs = kCdiStatusFatal; } } if (kCdiStatusOk != rs) { ConnectionDestroyInternal((CdiConnectionHandle)con_state_ptr); con_state_ptr = NULL; } *ret_handle_ptr = (CdiConnectionHandle)con_state_ptr; return rs; } /** * Payload transfer has completed either successfully or in error. Update stats and queue payload message to * application. * * @param endpoint_ptr Pointer to endpoint state data. * @param payload_state_ptr Pointer to payload state data. The pointer is no longer valid after function returns. */ static void PayloadTransferComplete(CdiEndpointState* endpoint_ptr, TxPayloadState* payload_state_ptr) { CdiConnectionState* con_state_ptr = (CdiConnectionState*)endpoint_ptr->connection_state_ptr; StatsGatherPayloadStatsFromConnection(endpoint_ptr, kCdiStatusOk == payload_state_ptr->app_payload_cb_data.payload_status_code, payload_state_ptr->start_time, payload_state_ptr->max_latency_microsecs, payload_state_ptr->data_bytes_transferred); // Copy the payload's source SGL to the callback data, so we can free the SGL entries in AppCallbackPayloadThread() // to reduce the amount of work required here by the Tx Poll() thread. This also allows the payload_state_ptr to // be freed in this function, since it is no longer needed. payload_state_ptr->app_payload_cb_data.tx_source_sgl = payload_state_ptr->source_sgl; // Post message to notify application that payload transfer has completed. if (!CdiQueuePush(con_state_ptr->app_payload_message_queue_handle, &payload_state_ptr->app_payload_cb_data)) { CDI_LOG_THREAD(kLogError, "Queue[%s] full, push failed.", CdiQueueGetName(con_state_ptr->app_payload_message_queue_handle)); // Since queue was full, need to free the resources associated with the payload. CdiSglEntry* entry_ptr = payload_state_ptr->app_payload_cb_data.tx_source_sgl.sgl_head_ptr; while (entry_ptr) { CdiSglEntry* next_ptr = entry_ptr->next_ptr; // Save next entry, since Put() will free its memory. CdiPoolPut(con_state_ptr->tx_state.payload_sgl_entry_pool_handle, entry_ptr); entry_ptr = next_ptr; } // If error message exists, return it to pool. PayloadErrorFreeBuffer(con_state_ptr->error_message_pool, &payload_state_ptr->app_payload_cb_data); } // Done with payload state data, so free it. CdiPoolPut(con_state_ptr->tx_state.payload_state_pool_handle, payload_state_ptr); } /** * Flush a payload that did not complete transferring. This will set the payload's status and queue a payload message * to the application. * * @param endpoint_ptr Pointer to endpoint state data. * @param payload_state_ptr Pointer to payload state data. The pointer is not longer valid after function returns. */ static void FlushFailedPayload(CdiEndpointState* endpoint_ptr, TxPayloadState* payload_state_ptr) { if (kCdiStatusOk == payload_state_ptr->app_payload_cb_data.payload_status_code) { payload_state_ptr->app_payload_cb_data.payload_status_code = kCdiStatusSendFailed; } // Clear this list. It will be cleared by TxPayloadThreadFlushResources(). See packet_sgl_entry_pool_handle pool. CdiSinglyLinkedListInit(&payload_state_ptr->completed_packets_list); // Queue message to the application. PayloadTransferComplete(endpoint_ptr, payload_state_ptr); } //********************************************************************************************************************* //******************************************* START OF PUBLIC FUNCTIONS *********************************************** //********************************************************************************************************************* CdiReturnStatus TxCreateInternal(CdiConnectionProtocolType protocol_type, CdiTxConfigData* config_data_ptr, CdiCallback tx_cb_ptr, CdiConnectionHandle* ret_handle_ptr) { CdiReturnStatus rs = TxCreateConnection(protocol_type, config_data_ptr, tx_cb_ptr, ret_handle_ptr); if (kCdiStatusOk == rs) { CdiConnectionState* con_state_ptr = *((CdiConnectionState**)ret_handle_ptr); rs = EndpointManagerTxCreateEndpoint(con_state_ptr->endpoint_manager_handle, false, config_data_ptr->dest_ip_addr_str, config_data_ptr->dest_port, NULL, &con_state_ptr->default_tx_endpoint_ptr); } return rs; } CdiReturnStatus TxStreamConnectionCreateInternal(CdiTxConfigData* config_data_ptr, CdiCallback tx_cb_ptr, CdiConnectionHandle* ret_handle_ptr) { return TxCreateConnection(kProtocolTypeAvm, config_data_ptr, tx_cb_ptr, ret_handle_ptr); } CdiReturnStatus TxStreamEndpointCreateInternal(CdiConnectionHandle handle, CdiTxConfigDataStream* stream_config_ptr, CdiEndpointHandle* ret_handle_ptr) { return EndpointManagerTxCreateEndpoint(handle->endpoint_manager_handle, true, stream_config_ptr->dest_ip_addr_str, stream_config_ptr->dest_port, stream_config_ptr->stream_name_str, ret_handle_ptr); } CdiReturnStatus TxPayloadInternal(CdiEndpointState* endpoint_ptr, const CdiCoreTxPayloadConfig* core_payload_config_ptr, const CdiSgList* sgl_ptr, int max_latency_microsecs, int extra_data_size, uint8_t* extra_data_ptr) { assert(sgl_ptr->total_data_size > 0); uint64_t start_time = CdiOsGetMicroseconds(); CdiReturnStatus rs = kCdiStatusOk; CdiConnectionState* con_state_ptr = endpoint_ptr->connection_state_ptr; if (kCdiConnectionStatusConnected != endpoint_ptr->adapter_endpoint_ptr->connection_status_code) { // Currently not connected, so no need to advance the payload any further here. return kCdiStatusNotConnected; } // Get free entry for payload state data from pool. NOTE: This pool is thread-safe, since it is used by application // thread(s) here and by TxPayloadThread(). TxPayloadState* payload_state_ptr = NULL; if (!CdiPoolGet(con_state_ptr->tx_state.payload_state_pool_handle, (void**)&payload_state_ptr)) { // No free entries are available. Since this pool does not dynamically grow the queue used below must be full, // so return the queue full status here. rs = kCdiStatusQueueFull; } else { memset((void*)payload_state_ptr, 0, sizeof(TxPayloadState)); payload_state_ptr->app_payload_cb_data.core_extra_data = core_payload_config_ptr->core_extra_data; payload_state_ptr->app_payload_cb_data.tx_payload_user_cb_param = core_payload_config_ptr->user_cb_param; payload_state_ptr->start_time = start_time; payload_state_ptr->max_latency_microsecs = max_latency_microsecs; CdiSinglyLinkedListInit(&payload_state_ptr->completed_packets_list); // Calculate the size of a group of units of unit_size. int group_size = 1; // How many units of unit_size need to be grouped to be byte aligned. if (core_payload_config_ptr->unit_size > 0) { switch (core_payload_config_ptr->unit_size % 8) { case 0: group_size = 1; break; case 2: group_size = 4; break; case 4: group_size = 2; break; case 6: group_size = 4; break; default: // For a fixed unit_size worst case of 8 units together will always be byte aligned. group_size = 8; break; } } payload_state_ptr->group_size_bytes = (group_size * core_payload_config_ptr->unit_size) / 8; payload_state_ptr->app_payload_cb_data.extra_data_size = extra_data_size; if (extra_data_size) { memcpy(&payload_state_ptr->app_payload_cb_data.extra_data_array, extra_data_ptr, extra_data_size); } payload_state_ptr->cdi_endpoint_handle = endpoint_ptr; // Save the endpoint used to send this payload. if (!PayloadInit(con_state_ptr, sgl_ptr, payload_state_ptr)) { rs = kCdiStatusAllocationFailed; } else { // Put Tx payload message into the payload queue. The TxPayloadThread() thread will then process the // message. Don't block here and wait if the queue is full, return an error. if (!CdiQueuePush(con_state_ptr->tx_state.payload_queue_handle, &payload_state_ptr)) { // Queue was full, put the allocated memory back in the pools. rs = kCdiStatusQueueFull; } } if (kCdiStatusOk != rs) { // An error occurred, so free pool buffers reserved here and in CdiPayloadInit(). CdiSglEntry* entry_ptr = payload_state_ptr->source_sgl.sgl_head_ptr; while (entry_ptr) { CdiSglEntry* next_ptr = entry_ptr->next_ptr; // Save next entry, since Put() will free its memory. CdiPoolPut(con_state_ptr->tx_state.payload_sgl_entry_pool_handle, entry_ptr); entry_ptr = next_ptr; } CdiPoolPut(con_state_ptr->tx_state.payload_state_pool_handle, payload_state_ptr); } } return rs; } void TxPayloadThreadFlushResources(CdiEndpointState* endpoint_ptr) { CdiConnectionState* con_state_ptr = (CdiConnectionState*)endpoint_ptr->connection_state_ptr; CdiQueueFlush(con_state_ptr->tx_state.payload_queue_handle); // Process items in the work request completion queue. This will drain the queue and free associated resources // (ie. work_request_pool_handle) before we manually remove resources below. PayloadTransferComplete() has already // been called for all items in this queue (so don't call it again here). ProcessWorkRequestCompletionQueue(con_state_ptr); // Walk through the work request pool and free associated resources. // NOTE: All the pools used in this function are not thread-safe, so must ensure that only one thread is accessing // them at a time. TxPayloadState* payload_state_ptr = NULL; TxPacketWorkRequest* work_request_ptr = NULL; while (CdiPoolPeekInUse(con_state_ptr->tx_state.work_request_pool_handle, (void**)&work_request_ptr)) { // NOTE: PayloadTransferComplete() called from FlushFailedPayload() frees the payload pointer, so only call // FlushFailedPayload() after walking to the end of the list of work requests related to it. if (payload_state_ptr != work_request_ptr->payload_state_ptr) { if (NULL == payload_state_ptr) { payload_state_ptr = work_request_ptr->payload_state_ptr; } else { FlushFailedPayload(endpoint_ptr, payload_state_ptr); // Frees the payload pointer payload_state_ptr = NULL; // Pointer is no longer valid, so clear it. } } // Free all packet SGL entries related to this work request. This will free all entries that have been // completed successfully and ones that have not. CdiSglEntry* packet_entry_hdr_ptr = work_request_ptr->packet.sg_list.sgl_head_ptr; if (packet_entry_hdr_ptr) { // Put back SGL entry for each one in the list. FreeSglEntries(con_state_ptr->tx_state.packet_sgl_entry_pool_handle, packet_entry_hdr_ptr); } // Put back work request into the pool. PutWorkRequestInPool(con_state_ptr, work_request_ptr); work_request_ptr = NULL; // Pointer is no longer valid, so clear it. } // Flush last payload if value exists. if (payload_state_ptr) { FlushFailedPayload(endpoint_ptr, payload_state_ptr); payload_state_ptr = NULL; // Pointer is no longer valid, so clear it. } CdiPoolPutAll(con_state_ptr->tx_state.payload_state_pool_handle); // Don't free tx_state.payload_sgl_entry_pool_handle here. AppCallbackPayloadThread() frees them. When a connection // is destroyed, the pool is flushed in TxConnectionDestroyInternal(). // Free Tx header pool entries from each item in the work request pool. CdiPoolForEachItem(con_state_ptr->tx_state.work_request_pool_handle, TxPacketWorkRequestPoolItemFree, NULL); CdiPoolPutAll(con_state_ptr->tx_state.work_request_pool_handle); CdiQueueFlush(con_state_ptr->tx_state.work_req_comp_queue_handle); CdiPoolPutAll(con_state_ptr->tx_state.payload_state_pool_handle); // Don't free tx_state.payload_sgl_entry_pool_handle here. AppCallbackPayloadThread() frees them. When a connection // is destroyed, the pool is flushed in TxConnectionDestroyInternal(). CdiPoolPutAll(con_state_ptr->tx_state.packet_sgl_entry_pool_handle); // NOTE: Don't flush app_payload_message_queue_handle here. Entries are popped using AppCallbackPayloadThread(). // When a connection is destroyed, they are flushed in TxConnectionDestroyInternal(). con_state_ptr->back_pressure_state = kCdiBackPressureNone; // Reset the back pressure state. endpoint_ptr->tx_state.payload_num = 0; // Clear payload number so receiver can expect payload zero first. endpoint_ptr->tx_state.packet_id = 0; // Reset packet ID to zero. } CdiReturnStatus TxConnectionThreadJoin(CdiConnectionHandle con_handle) { CdiConnectionState* con_state_ptr = (CdiConnectionState*)con_handle; if (con_state_ptr) { // Clean-up thread resources. We will wait for it to exit using thread join. SdkThreadJoin(con_state_ptr->payload_thread_id, con_state_ptr->shutdown_signal); con_state_ptr->payload_thread_id = NULL; } return kCdiStatusOk; } void TxConnectionDestroyInternal(CdiConnectionHandle con_handle) { CdiConnectionState* con_state_ptr = (CdiConnectionState*)con_handle; if (con_state_ptr) { // Destroying connection, so ensure app payload queues and pools are drained. NOTE: This must be done after // the poll thread and AppCallbackPayloadThread have stopped. AppPayloadCallbackData app_cb_data; while (CdiQueuePop(con_state_ptr->app_payload_message_queue_handle, (void**)&app_cb_data)) { PayloadErrorFreeBuffer(con_state_ptr->error_message_pool, &app_cb_data); } CdiPoolPutAll(con_state_ptr->tx_state.payload_sgl_entry_pool_handle); // Now that the connection and adapter threads have stopped, it is safe to clean up the remaining resources in // the opposite order of their creation. CdiQueueDestroy(con_state_ptr->tx_state.work_req_comp_queue_handle); con_state_ptr->tx_state.work_req_comp_queue_handle = NULL; // The application may be destroying the connection with Tx payloads in flight, so ensure this pool has been // flushed before destroying it. CdiPoolPutAll(con_state_ptr->tx_state.payload_sgl_entry_pool_handle); CdiPoolDestroy(con_state_ptr->tx_state.payload_sgl_entry_pool_handle); con_state_ptr->tx_state.payload_sgl_entry_pool_handle = NULL; CdiPoolDestroy(con_state_ptr->tx_state.payload_state_pool_handle); con_state_ptr->tx_state.payload_state_pool_handle = NULL; CdiPoolDestroy(con_state_ptr->tx_state.packet_sgl_entry_pool_handle); con_state_ptr->tx_state.packet_sgl_entry_pool_handle = NULL; // Free Tx header pool entries from each item in the work request pool. CdiPoolForEachItem(con_state_ptr->tx_state.work_request_pool_handle, TxPacketWorkRequestPoolItemFree, NULL); CdiPoolDestroy(con_state_ptr->tx_state.work_request_pool_handle); con_state_ptr->tx_state.work_request_pool_handle = NULL; CdiQueueDestroy(con_state_ptr->tx_state.payload_queue_handle); con_state_ptr->tx_state.payload_queue_handle = NULL; // NOTE: con_state_ptr is freed by the caller. } } void TxEndpointDestroy(CdiEndpointHandle handle) { CdiEndpointState* endpoint_ptr = (CdiEndpointState*)handle; CdiOsCritSectionDelete(endpoint_ptr->tx_state.payload_num_lock); endpoint_ptr->tx_state.payload_num_lock = NULL; } void TxPacketWorkRequestComplete(void* param_ptr, Packet* packet_ptr, EndpointMessageType message_type) { assert(kEndpointMessageTypePacketSent == message_type); (void)message_type; CdiEndpointState* endpoint_ptr = (CdiEndpointState*)param_ptr; CdiConnectionState* con_state_ptr = endpoint_ptr->connection_state_ptr; CdiAdapterTxPacketComplete(endpoint_ptr->adapter_endpoint_ptr, packet_ptr); if (kAdapterPacketStatusNotConnected == packet_ptr->tx_state.ack_status) { return; } // The internal_data_ptr contains a work request pointer that was set in TxPayloadThread(). TxPacketWorkRequest* work_request_ptr = (TxPacketWorkRequest*)packet_ptr->sg_list.internal_data_ptr; // Now that we have our work request, we can setup additional state data pointers. TxPayloadState* payload_state_ptr = work_request_ptr->payload_state_ptr; // Check if the packet is from the payload that we are currently processing. if (payload_state_ptr->payload_packet_state.payload_num != work_request_ptr->payload_num) { CDI_LOG_THREAD(kLogWarning, "Connection[%s] packet for payload[%d] not from current payload[%d]", endpoint_ptr->connection_state_ptr->saved_connection_name_str, payload_state_ptr->payload_packet_state.payload_num, work_request_ptr->payload_num); } else { payload_state_ptr->data_bytes_transferred += work_request_ptr->packet_payload_size; if (kPayloadTypeKeepAlive == payload_state_ptr->payload_packet_state.payload_type) { // Payload type is keep alive. Keep it internal and do not use the application callback. Nothing special to // do here, unless payload data was allocated dynamically using a pool. If so, will need to free it here. } else { CdiSinglyLinkedListPushTail(&payload_state_ptr->completed_packets_list, (void*)&work_request_ptr->packet.list_entry); if (payload_state_ptr->data_bytes_transferred >= payload_state_ptr->source_sgl.total_data_size) { // Payload transfer complete. Pointer is freed below in PayloadTransferComplete(). Clear it now so it // cannot be accidentally used later. work_request_ptr->payload_state_ptr = NULL; // Put list of work requests in queue so TxPayloadThread() can free the allocated resources. if (!CdiQueuePush(con_state_ptr->tx_state.work_req_comp_queue_handle, &payload_state_ptr->completed_packets_list)) { CDI_LOG_THREAD(kLogError, "Queue[%s] full, push failed.", CdiQueueGetName(con_state_ptr->tx_state.work_req_comp_queue_handle)); } work_request_ptr = NULL; // Pointer may no longer be valid, so clear it now. // Updates stats and puts message in queue to call the user registered Tx callback function. PayloadTransferComplete(endpoint_ptr, payload_state_ptr); payload_state_ptr = NULL; // Pointer is no longer valid. } } } } void TxInvokeAppPayloadCallback(CdiConnectionState* con_state_ptr, AppPayloadCallbackData* app_cb_data_ptr) { CdiCoreCbData core_cb_data = { .status_code = app_cb_data_ptr->payload_status_code, .err_msg_str = app_cb_data_ptr->error_message_str, .connection_handle = (CdiConnectionHandle)con_state_ptr, .core_extra_data = app_cb_data_ptr->core_extra_data, .user_cb_param = app_cb_data_ptr->tx_payload_user_cb_param, }; if (kProtocolTypeRaw == con_state_ptr->protocol_type) { // Raw protocol so calling CdiRawTxCallback(). CdiRawTxCbData cb_data = { .core_cb_data = core_cb_data }; CdiRawTxCallback raw_tx_cb_ptr = (CdiRawTxCallback)con_state_ptr->tx_state.cb_ptr; (raw_tx_cb_ptr)(&cb_data); // Call the user-registered callback function. } else { // AVM protocol so calling CdiAvmTxCallback(). CDIPacketAvmCommonHeader* avm_common_header_ptr = (CDIPacketAvmCommonHeader*)&app_cb_data_ptr->extra_data_array; CdiAvmTxCbData cb_data = { .core_cb_data = core_cb_data, .avm_extra_data = avm_common_header_ptr->avm_extra_data }; CdiAvmTxCallback avm_tx_cb_ptr = (CdiAvmTxCallback)con_state_ptr->tx_state.cb_ptr; (avm_tx_cb_ptr)(&cb_data); // Call the user-registered callback function. } }