/* * Copyright (c) 2019 Amazon.com, Inc. or its affiliates. * All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the * BSD license below: * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * - Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * - Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #if HAVE_CONFIG_H #include <config.h> #endif /* HAVE_CONFIG_H */ #ifndef _RXR_H_ #define _RXR_H_ #include <pthread.h> #include <rdma/fabric.h> #include <rdma/fi_atomic.h> #include <rdma/fi_cm.h> #include <rdma/fi_domain.h> #include <rdma/fi_endpoint.h> #include <rdma/fi_eq.h> #include <rdma/fi_errno.h> #include <rdma/fi_rma.h> #include <rdma/fi_tagged.h> #include <rdma/fi_trigger.h> #include <ofi.h> #include <ofi_iov.h> #include <ofi_proto.h> #include <ofi_enosys.h> #include <ofi_rbuf.h> #include <ofi_list.h> #include <ofi_util.h> #include <ofi_tree.h> #include <uthash.h> #include <ofi_recvwin.h> #include <ofi_perf.h> #include <ofi_hmem.h> #include "rxr_pkt_entry.h" #include "rxr_pkt_type.h" #define RXR_MAJOR_VERSION (2) #define RXR_MINOR_VERSION (0) /* * EFA support interoperability between protocol version 4 and above, * and version 4 is considered the base version. */ #define RXR_BASE_PROTOCOL_VERSION (4) #define RXR_CUR_PROTOCOL_VERSION (4) #define RXR_NUM_PROTOCOL_VERSION (RXR_CUR_PROTOCOL_VERSION - RXR_BASE_PROTOCOL_VERSION + 1) #define RXR_MAX_PROTOCOL_VERSION (100) #define RXR_FI_VERSION OFI_VERSION_LATEST #define RXR_IOV_LIMIT (4) #ifdef ENABLE_EFA_POISONING extern const uint32_t rxr_poison_value; static inline void rxr_poison_mem_region(uint32_t *ptr, size_t size) { int i; for (i = 0; i < size / sizeof(rxr_poison_value); i++) memcpy(ptr + i, &rxr_poison_value, sizeof(rxr_poison_value)); } #endif /* * Set alignment to x86 cache line size. */ #define RXR_BUF_POOL_ALIGNMENT (64) /* * will add following parameters to env variable for tuning */ #define RXR_RECVWIN_SIZE (16384) #define RXR_DEF_CQ_SIZE (8192) #define RXR_REMOTE_CQ_DATA_LEN (8) /* maximum timeout for RNR backoff (microseconds) */ #define RXR_DEF_RNR_MAX_TIMEOUT (1000000) /* bounds for random RNR backoff timeout */ #define RXR_RAND_MIN_TIMEOUT (40) #define RXR_RAND_MAX_TIMEOUT (120) /* bounds for flow control */ #define RXR_DEF_MAX_RX_WINDOW (128) #define RXR_DEF_MAX_TX_CREDITS (64) #define RXR_DEF_MIN_TX_CREDITS (32) /* * maximum time (microseconds) we will allow available_bufs for large msgs to * be exhausted */ #define RXR_AVAILABLE_DATA_BUFS_TIMEOUT (5000000) #if ENABLE_DEBUG #define RXR_TX_PKT_DBG_SIZE (16384) #define RXR_RX_PKT_DBG_SIZE (16384) #endif /* * Based on size of tx_id and rx_id in headers, can be arbitrary once those are * removed. */ #define RXR_MAX_RX_QUEUE_SIZE (UINT32_MAX) #define RXR_MAX_TX_QUEUE_SIZE (UINT32_MAX) /* * The maximum supported source address length in bytes */ #define RXR_MAX_NAME_LENGTH (32) /* * RxR specific flags that are sent over the wire. */ #define RXR_TAGGED BIT_ULL(0) #define RXR_REMOTE_CQ_DATA BIT_ULL(1) #define RXR_REMOTE_SRC_ADDR BIT_ULL(2) /* * TODO: In future we will send RECV_CANCEL signal to sender, * to stop transmitting large message, this flag is also * used for fi_discard which has similar behavior. */ #define RXR_RECV_CANCEL BIT_ULL(3) /* * Flags to tell if the rx_entry is tracking FI_MULTI_RECV buffers */ #define RXR_MULTI_RECV_POSTED BIT_ULL(4) #define RXR_MULTI_RECV_CONSUMER BIT_ULL(5) /* * OFI flags * The 64-bit flag field is used as follows: * 1-grow up common (usable with multiple operations) * 59-grow down operation specific (used for single call/class) * 60 - 63 provider specific */ #define RXR_NO_COMPLETION BIT_ULL(60) /* * RM flags */ #define RXR_RM_TX_CQ_FULL BIT_ULL(0) #define RXR_RM_RX_CQ_FULL BIT_ULL(1) #define RXR_MTU_MAX_LIMIT BIT_ULL(15) extern struct fi_info *shm_info; extern struct fi_provider *lower_efa_prov; extern struct fi_provider rxr_prov; extern struct fi_info rxr_info; extern struct rxr_env rxr_env; extern struct fi_fabric_attr rxr_fabric_attr; extern struct util_prov rxr_util_prov; extern struct efa_ep_addr *local_efa_addr; struct rxr_env { int rx_window_size; int tx_min_credits; int tx_max_credits; int tx_queue_size; int use_device_rdma; int enable_shm_transfer; int shm_av_size; int shm_max_medium_size; int recvwin_size; int cq_size; size_t max_memcpy_size; size_t mtu_size; size_t tx_size; size_t rx_size; size_t tx_iov_limit; size_t rx_iov_limit; int rx_copy_unexp; int rx_copy_ooo; int max_timeout; int timeout_interval; size_t efa_cq_read_size; size_t shm_cq_read_size; size_t efa_max_medium_msg_size; size_t efa_min_read_write_size; size_t efa_read_segment_size; }; enum rxr_lower_ep_type { EFA_EP = 1, SHM_EP, }; enum rxr_x_entry_type { RXR_TX_ENTRY = 1, RXR_RX_ENTRY, }; enum rxr_tx_comm_type { RXR_TX_FREE = 0, /* tx_entry free state */ RXR_TX_REQ, /* tx_entry sending REQ packet */ RXR_TX_SEND, /* tx_entry sending data in progress */ RXR_TX_QUEUED_SHM_RMA, /* tx_entry was unable to send RMA operations over shm provider */ RXR_TX_QUEUED_CTRL, /* tx_entry was unable to send ctrl packet */ RXR_TX_QUEUED_REQ_RNR, /* tx_entry RNR sending REQ packet */ RXR_TX_QUEUED_DATA_RNR, /* tx_entry RNR sending data packets */ RXR_TX_SENT_READRSP, /* tx_entry (on remote EP) sent * read response (FI_READ only) */ RXR_TX_QUEUED_READRSP, /* tx_entry (on remote EP) was * unable to send read response * (FI_READ only) */ RXR_TX_WAIT_READ_FINISH, /* tx_entry (on initiating EP) wait * for rx_entry to finish receiving * (FI_READ only) */ }; enum rxr_rx_comm_type { RXR_RX_FREE = 0, /* rx_entry free state */ RXR_RX_INIT, /* rx_entry ready to recv RTM */ RXR_RX_UNEXP, /* rx_entry unexp msg waiting for post recv */ RXR_RX_MATCHED, /* rx_entry matched with RTM */ RXR_RX_RECV, /* rx_entry large msg recv data pkts */ RXR_RX_QUEUED_CTRL, /* rx_entry was unable to send ctrl packet */ RXR_RX_QUEUED_EOR, /* rx_entry was unable to send EOR over shm */ RXR_RX_QUEUED_CTS_RNR, /* rx_entry RNR sending CTS */ RXR_RX_WAIT_READ_FINISH, /* rx_entry wait for send to finish, FI_READ */ RXR_RX_WAIT_ATOMRSP_SENT, /* rx_entry wait for atomrsp packet sent completion */ }; #define RXR_PEER_REQ_SENT BIT_ULL(0) /* sent a REQ to the peer, peer should send a handshake back */ #define RXR_PEER_HANDSHAKE_SENT BIT_ULL(1) #define RXR_PEER_HANDSHAKE_RECEIVED BIT_ULL(2) #define RXR_PEER_IN_BACKOFF BIT_ULL(3) /* peer is in backoff, not allowed to send */ #define RXR_PEER_BACKED_OFF BIT_ULL(4) /* peer backoff was increased during this loop of the progress engine */ struct rxr_fabric { struct util_fabric util_fabric; struct fid_fabric *lower_fabric; struct fid_fabric *shm_fabric; #ifdef RXR_PERF_ENABLED struct ofi_perfset perf_set; #endif }; #define RXR_MAX_NUM_PROTOCOLS (RXR_MAX_PROTOCOL_VERSION - RXR_BASE_PROTOCOL_VERSION + 1) struct rxr_peer { bool tx_init; /* tracks initialization of tx state */ bool rx_init; /* tracks initialization of rx state */ bool is_self; /* self flag */ bool is_local; /* local/remote peer flag */ fi_addr_t shm_fiaddr; /* fi_addr_t addr from shm provider */ struct rxr_robuf *robuf; /* tracks expected msg_id on rx */ uint32_t next_msg_id; /* sender's view of msg_id */ uint32_t flags; uint32_t maxproto; /* maximum supported protocol version by this peer */ uint64_t features[RXR_MAX_NUM_PROTOCOLS]; /* the feature flag for each version */ size_t tx_pending; /* tracks pending tx ops to this peer */ uint16_t tx_credits; /* available send credits */ uint16_t rx_credits; /* available credits to allocate */ uint64_t rnr_ts; /* timestamp for RNR backoff tracking */ int rnr_queued_pkt_cnt; /* queued RNR packet count */ int timeout_interval; /* initial RNR timeout value */ int rnr_timeout_exp; /* RNR timeout exponentation calc val */ struct dlist_entry rnr_entry; /* linked to rxr_ep peer_backoff_list */ struct dlist_entry entry; /* linked to rxr_ep peer_list */ }; struct rxr_queued_ctrl_info { int type; int inject; }; struct rxr_atomic_hdr { /* atomic_op is different from tx_op */ uint32_t atomic_op; uint32_t datatype; }; /* extra information that is not included in fi_msg_atomic * used by fetch atomic and compare atomic. * resp stands for response * comp stands for compare */ struct rxr_atomic_ex { struct iovec resp_iov[RXR_IOV_LIMIT]; int resp_iov_count; struct iovec comp_iov[RXR_IOV_LIMIT]; int comp_iov_count; }; struct rxr_rx_entry { /* Must remain at the top */ enum rxr_x_entry_type type; fi_addr_t addr; /* * freestack ids used to lookup rx_entry during pkt recv */ uint32_t tx_id; uint32_t rx_id; uint32_t op; /* * The following two varibales are for emulated RMA fi_read only */ uint32_t rma_loc_tx_id; uint32_t rma_initiator_rx_id; struct rxr_atomic_hdr atomic_hdr; uint32_t msg_id; uint64_t tag; uint64_t ignore; uint64_t bytes_done; int64_t window; uint16_t credit_request; int credit_cts; uint64_t total_len; enum rxr_rx_comm_type state; struct rxr_queued_ctrl_info queued_ctrl; uint64_t fi_flags; uint16_t rxr_flags; size_t iov_count; struct iovec iov[RXR_IOV_LIMIT]; /* App-provided reg descriptor */ void *desc[RXR_IOV_LIMIT]; /* iov_count on sender side, used for large message READ over shm */ size_t rma_iov_count; struct fi_rma_iov rma_iov[RXR_IOV_LIMIT]; struct fi_cq_tagged_entry cq_entry; /* entry is linked with rx entry lists in rxr_ep */ struct dlist_entry entry; /* queued_entry is linked with rx_queued_ctrl_list in rxr_ep */ struct dlist_entry queued_entry; /* Queued packets due to TX queue full or RNR backoff */ struct dlist_entry queued_pkts; /* * A list of rx_entries tracking FI_MULTI_RECV buffers. An rx_entry of * type RXR_MULTI_RECV_POSTED that was created when the multi-recv * buffer was posted is the list head, and the rx_entries of type * RXR_MULTI_RECV_CONSUMER get added to the list as they consume the * buffer. */ struct dlist_entry multi_recv_consumers; struct dlist_entry multi_recv_entry; struct rxr_rx_entry *master_entry; struct rxr_pkt_entry *unexp_pkt; struct rxr_pkt_entry *atomrsp_pkt; char *atomrsp_buf; #if ENABLE_DEBUG /* linked with rx_pending_list in rxr_ep */ struct dlist_entry rx_pending_entry; /* linked with rx_entry_list in rxr_ep */ struct dlist_entry rx_entry_entry; #endif }; struct rxr_tx_entry { /* Must remain at the top */ enum rxr_x_entry_type type; uint32_t op; fi_addr_t addr; /* * freestack ids used to lookup tx_entry during ctrl pkt recv */ uint32_t tx_id; uint32_t rx_id; uint32_t msg_id; uint64_t tag; uint64_t bytes_acked; uint64_t bytes_sent; int64_t window; uint16_t credit_request; uint16_t credit_allocated; uint64_t total_len; enum rxr_tx_comm_type state; struct rxr_queued_ctrl_info queued_ctrl; uint64_t fi_flags; uint64_t send_flags; size_t iov_count; size_t iov_index; size_t iov_offset; struct iovec iov[RXR_IOV_LIMIT]; uint64_t rma_loc_rx_id; uint64_t rma_window; size_t rma_iov_count; struct fi_rma_iov rma_iov[RXR_IOV_LIMIT]; /* App-provided reg descriptor */ void *desc[RXR_IOV_LIMIT]; /* atomic related variables */ struct rxr_atomic_hdr atomic_hdr; struct rxr_atomic_ex atomic_ex; /* Only used with mr threshold switch from memcpy */ size_t iov_mr_start; struct fid_mr *mr[RXR_IOV_LIMIT]; struct fi_cq_tagged_entry cq_entry; /* entry is linked with tx_pending_list in rxr_ep */ struct dlist_entry entry; /* queued_entry is linked with tx_queued_ctrl_list in rxr_ep */ struct dlist_entry queued_entry; /* Queued packets due to TX queue full or RNR backoff */ struct dlist_entry queued_pkts; #if ENABLE_DEBUG /* linked with tx_entry_list in rxr_ep */ struct dlist_entry tx_entry_entry; #endif }; #define RXR_GET_X_ENTRY_TYPE(pkt_entry) \ (*((enum rxr_x_entry_type *) \ ((unsigned char *)((pkt_entry)->x_entry)))) enum efa_domain_type { EFA_DOMAIN_DGRAM = 0, EFA_DOMAIN_RDM, }; struct rxr_domain { struct util_domain util_domain; enum efa_domain_type type; struct fid_domain *rdm_domain; size_t mtu_size; size_t addrlen; uint8_t mr_local; uint64_t rdm_mode; int do_progress; size_t cq_size; enum fi_resource_mgmt resource_mgmt; }; struct rxr_ep { struct util_ep util_ep; uint8_t core_addr[RXR_MAX_NAME_LENGTH]; size_t core_addrlen; /* per-version feature flag */ uint64_t features[RXR_NUM_PROTOCOL_VERSION]; /* per-peer information */ struct rxr_peer *peer; /* free stack for reorder buffer */ struct rxr_robuf_fs *robuf_fs; /* core provider fid */ struct fid_ep *rdm_ep; struct fid_cq *rdm_cq; /* shm provider fid */ bool use_shm; struct fid_ep *shm_ep; struct fid_cq *shm_cq; /* * RxR rx/tx queue sizes. These may be different from the core * provider's rx/tx size and will either limit the number of possible * receives/sends or allow queueing. */ size_t rx_size; size_t tx_size; size_t mtu_size; size_t rx_iov_limit; size_t tx_iov_limit; /* core's capabilities */ uint64_t core_caps; /* rx/tx queue size of core provider */ size_t core_rx_size; size_t max_outstanding_tx; size_t core_inject_size; size_t max_data_payload_size; /* Resource management flag */ uint64_t rm_full; /* application's ordering requirements */ uint64_t msg_order; /* core's supported tx/rx msg_order */ uint64_t core_msg_order; /* tx iov limit of core provider */ size_t core_iov_limit; /* threshold to release multi_recv buffer */ size_t min_multi_recv_size; /* buffer pool for send & recv */ struct ofi_bufpool *tx_pkt_efa_pool; struct ofi_bufpool *rx_pkt_efa_pool; /* * buffer pool for send & recv for shm as mtu size is different from * the one of efa, and do not require local memory registration */ struct ofi_bufpool *tx_pkt_shm_pool; struct ofi_bufpool *rx_pkt_shm_pool; /* staging area for unexpected and out-of-order packets */ struct ofi_bufpool *rx_unexp_pkt_pool; struct ofi_bufpool *rx_ooo_pkt_pool; #ifdef ENABLE_EFA_POISONING size_t tx_pkt_pool_entry_sz; size_t rx_pkt_pool_entry_sz; #endif /* datastructure to maintain rxr send/recv states */ struct ofi_bufpool *tx_entry_pool; struct ofi_bufpool *rx_entry_pool; /* datastructure to maintain read response */ struct ofi_bufpool *readrsp_tx_entry_pool; /* data structure to maintain read */ struct ofi_bufpool *read_entry_pool; /* data structure to maintain pkt rx map */ struct ofi_bufpool *map_entry_pool; /* rxr medium message pkt_entry to rx_entry map */ struct rxr_pkt_rx_map *pkt_rx_map; /* rx_entries with recv buf */ struct dlist_entry rx_list; /* rx_entries without recv buf (unexpected message) */ struct dlist_entry rx_unexp_list; /* rx_entries with tagged recv buf */ struct dlist_entry rx_tagged_list; /* rx_entries without tagged recv buf (unexpected message) */ struct dlist_entry rx_unexp_tagged_list; /* list of pre-posted recv buffers */ struct dlist_entry rx_posted_buf_list; /* list of pre-posted recv buffers for shm */ struct dlist_entry rx_posted_buf_shm_list; /* tx entries with queued messages */ struct dlist_entry tx_entry_queued_list; /* rx entries with queued messages */ struct dlist_entry rx_entry_queued_list; /* tx_entries with data to be sent (large messages) */ struct dlist_entry tx_pending_list; /* read entries with data to be read */ struct dlist_entry read_pending_list; /* rxr_peer entries that are in backoff due to RNR */ struct dlist_entry peer_backoff_list; /* rxr_peer entries with an allocated robuf */ struct dlist_entry peer_list; #if ENABLE_DEBUG /* rx_entries waiting for data to arrive (large messages) */ struct dlist_entry rx_pending_list; /* count of rx_pending_list */ size_t rx_pending; /* rx packets being processed or waiting to be processed */ struct dlist_entry rx_pkt_list; /* tx packets waiting for send completion */ struct dlist_entry tx_pkt_list; /* track allocated rx_entries and tx_entries for endpoint cleanup */ struct dlist_entry rx_entry_list; struct dlist_entry tx_entry_list; size_t sends; size_t send_comps; size_t failed_send_comps; size_t recv_comps; #endif /* number of posted buffer for shm */ size_t posted_bufs_shm; size_t rx_bufs_shm_to_post; /* number of posted buffers */ size_t posted_bufs_efa; size_t rx_bufs_efa_to_post; /* number of buffers available for large messages */ size_t available_data_bufs; /* Timestamp of when available_data_bufs was exhausted. */ uint64_t available_data_bufs_ts; /* number of outstanding sends */ size_t tx_pending; }; #define rxr_rx_flags(rxr_ep) ((rxr_ep)->util_ep.rx_op_flags) #define rxr_tx_flags(rxr_ep) ((rxr_ep)->util_ep.tx_op_flags) /* * Control header with completion data. CQ data length is static. */ #define RXR_CQ_DATA_SIZE (8) static inline void rxr_copy_shm_cq_entry(struct fi_cq_tagged_entry *cq_tagged_entry, struct fi_cq_data_entry *shm_cq_entry) { cq_tagged_entry->op_context = shm_cq_entry->op_context; cq_tagged_entry->flags = shm_cq_entry->flags; cq_tagged_entry->len = shm_cq_entry->len; cq_tagged_entry->buf = shm_cq_entry->buf; cq_tagged_entry->data = shm_cq_entry->data; cq_tagged_entry->tag = 0; // No tag for RMA; } static inline struct rxr_peer *rxr_ep_get_peer(struct rxr_ep *ep, fi_addr_t addr) { return &ep->peer[addr]; } static inline void rxr_ep_peer_init_rx(struct rxr_ep *ep, struct rxr_peer *peer) { assert(!peer->rx_init); peer->robuf = freestack_pop(ep->robuf_fs); peer->robuf = ofi_recvwin_buf_alloc(peer->robuf, rxr_env.recvwin_size); assert(peer->robuf); dlist_insert_tail(&peer->entry, &ep->peer_list); peer->rx_credits = rxr_env.rx_window_size; peer->rx_init = 1; } static inline void rxr_ep_peer_init_tx(struct rxr_peer *peer) { assert(!peer->tx_init); peer->tx_credits = rxr_env.tx_max_credits; peer->tx_init = 1; } struct rxr_rx_entry *rxr_ep_get_rx_entry(struct rxr_ep *ep, const struct fi_msg *msg, uint64_t tag, uint64_t ignore, uint32_t op, uint64_t flags); struct rxr_rx_entry *rxr_ep_rx_entry_init(struct rxr_ep *ep, struct rxr_rx_entry *rx_entry, const struct fi_msg *msg, uint64_t tag, uint64_t ignore, uint32_t op, uint64_t flags); void rxr_tx_entry_init(struct rxr_ep *rxr_ep, struct rxr_tx_entry *tx_entry, const struct fi_msg *msg, uint32_t op, uint64_t flags); struct rxr_tx_entry *rxr_ep_alloc_tx_entry(struct rxr_ep *rxr_ep, const struct fi_msg *msg, uint32_t op, uint64_t tag, uint64_t flags); int rxr_tx_entry_mr_dereg(struct rxr_tx_entry *tx_entry); static inline void rxr_release_tx_entry(struct rxr_ep *ep, struct rxr_tx_entry *tx_entry) { #if ENABLE_DEBUG dlist_remove(&tx_entry->tx_entry_entry); #endif assert(dlist_empty(&tx_entry->queued_pkts)); #ifdef ENABLE_EFA_POISONING rxr_poison_mem_region((uint32_t *)tx_entry, sizeof(struct rxr_tx_entry)); #endif tx_entry->state = RXR_TX_FREE; ofi_buf_free(tx_entry); } static inline void rxr_release_rx_entry(struct rxr_ep *ep, struct rxr_rx_entry *rx_entry) { #if ENABLE_DEBUG dlist_remove(&rx_entry->rx_entry_entry); #endif assert(dlist_empty(&rx_entry->queued_pkts)); #ifdef ENABLE_EFA_POISONING rxr_poison_mem_region((uint32_t *)rx_entry, sizeof(struct rxr_rx_entry)); #endif rx_entry->state = RXR_RX_FREE; ofi_buf_free(rx_entry); } static inline int rxr_match_addr(fi_addr_t addr, fi_addr_t match_addr) { return (addr == FI_ADDR_UNSPEC || addr == match_addr); } static inline int rxr_match_tag(uint64_t tag, uint64_t ignore, uint64_t match_tag) { return ((tag | ignore) == (match_tag | ignore)); } static inline void rxr_ep_inc_tx_pending(struct rxr_ep *ep, struct rxr_peer *peer) { ep->tx_pending++; peer->tx_pending++; #if ENABLE_DEBUG ep->sends++; #endif } static inline void rxr_ep_dec_tx_pending(struct rxr_ep *ep, struct rxr_peer *peer, int failed) { ep->tx_pending--; peer->tx_pending--; #if ENABLE_DEBUG if (failed) ep->failed_send_comps++; #endif } static inline size_t rxr_get_rx_pool_chunk_cnt(struct rxr_ep *ep) { return MIN(ep->core_rx_size, ep->rx_size); } static inline size_t rxr_get_tx_pool_chunk_cnt(struct rxr_ep *ep) { return MIN(ep->max_outstanding_tx, ep->tx_size); } static inline int rxr_need_sas_ordering(struct rxr_ep *ep) { return ep->msg_order & FI_ORDER_SAS; } /* Initialization functions */ void rxr_reset_rx_tx_to_core(const struct fi_info *user_info, struct fi_info *core_info); int rxr_get_lower_rdm_info(uint32_t version, const char *node, const char *service, uint64_t flags, const struct util_prov *util_prov, const struct fi_info *util_hints, struct fi_info **core_info); int rxr_fabric(struct fi_fabric_attr *attr, struct fid_fabric **fabric, void *context); int rxr_domain_open(struct fid_fabric *fabric, struct fi_info *info, struct fid_domain **dom, void *context); int rxr_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, struct fid_cq **cq_fid, void *context); int rxr_endpoint(struct fid_domain *domain, struct fi_info *info, struct fid_ep **ep, void *context); /* EP sub-functions */ void rxr_ep_progress(struct util_ep *util_ep); void rxr_ep_progress_internal(struct rxr_ep *rxr_ep); int rxr_ep_post_buf(struct rxr_ep *ep, uint64_t flags, enum rxr_lower_ep_type lower_ep); int rxr_ep_set_tx_credit_request(struct rxr_ep *rxr_ep, struct rxr_tx_entry *tx_entry); int rxr_ep_tx_init_mr_desc(struct rxr_domain *rxr_domain, struct rxr_tx_entry *tx_entry, int mr_iov_start, uint64_t access); void rxr_prepare_desc_send(struct rxr_domain *rxr_domain, struct rxr_tx_entry *tx_entry); struct rxr_rx_entry *rxr_ep_lookup_mediumrtm_rx_entry(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry); void rxr_ep_record_mediumrtm_rx_entry(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry, struct rxr_rx_entry *rx_entry); struct rxr_rx_entry *rxr_ep_alloc_unexp_rx_entry_for_msgrtm(struct rxr_ep *ep, struct rxr_pkt_entry **pkt_entry); struct rxr_rx_entry *rxr_ep_alloc_unexp_rx_entry_for_tagrtm(struct rxr_ep *ep, struct rxr_pkt_entry **pkt_entry); struct rxr_rx_entry *rxr_ep_split_rx_entry(struct rxr_ep *ep, struct rxr_rx_entry *posted_entry, struct rxr_rx_entry *consumer_entry, struct rxr_pkt_entry *pkt_entry); int rxr_ep_efa_addr_to_str(const void *addr, char *temp_name); /* CQ sub-functions */ int rxr_cq_handle_rx_error(struct rxr_ep *ep, struct rxr_rx_entry *rx_entry, ssize_t prov_errno); int rxr_cq_handle_tx_error(struct rxr_ep *ep, struct rxr_tx_entry *tx_entry, ssize_t prov_errno); int rxr_cq_handle_cq_error(struct rxr_ep *ep, ssize_t err); void rxr_cq_write_rx_completion(struct rxr_ep *ep, struct rxr_rx_entry *rx_entry); void rxr_cq_handle_rx_completion(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry, struct rxr_rx_entry *rx_entry); void rxr_cq_write_tx_completion(struct rxr_ep *ep, struct rxr_tx_entry *tx_entry); void rxr_cq_handle_tx_completion(struct rxr_ep *ep, struct rxr_tx_entry *tx_entry); void rxr_cq_handle_shm_completion(struct rxr_ep *ep, struct fi_cq_data_entry *cq_entry, fi_addr_t src_addr); int rxr_cq_reorder_msg(struct rxr_ep *ep, struct rxr_peer *peer, struct rxr_pkt_entry *pkt_entry); void rxr_cq_proc_pending_items_in_recvwin(struct rxr_ep *ep, struct rxr_peer *peer); void rxr_cq_handle_shm_rma_write_data(struct rxr_ep *ep, struct fi_cq_data_entry *shm_comp, fi_addr_t src_addr); /* Aborts if unable to write to the eq */ static inline void efa_eq_write_error(struct util_ep *ep, ssize_t err, ssize_t prov_errno) { struct fi_eq_err_entry err_entry; int ret = -FI_ENOEQ; FI_WARN(&rxr_prov, FI_LOG_EQ, "Writing error %s to EQ.\n", fi_strerror(err)); if (ep->eq) { memset(&err_entry, 0, sizeof(err_entry)); err_entry.err = err; err_entry.prov_errno = prov_errno; ret = fi_eq_write(&ep->eq->eq_fid, FI_NOTIFY, &err_entry, sizeof(err_entry), UTIL_FLAG_ERROR); if (ret == sizeof(err_entry)) return; } FI_WARN(&rxr_prov, FI_LOG_EQ, "Unable to write to EQ: %s. err: %s (%zd) prov_errno: %s (%zd)\n", fi_strerror(-ret), fi_strerror(err), err, fi_strerror(prov_errno), prov_errno); fprintf(stderr, "Unable to write to EQ: %s. err: %s (%zd) prov_errno: %s (%zd) %s:%d\n", fi_strerror(-ret), fi_strerror(err), err, fi_strerror(prov_errno), prov_errno, __FILE__, __LINE__); abort(); } static inline struct rxr_domain *rxr_ep_domain(struct rxr_ep *ep) { return container_of(ep->util_ep.domain, struct rxr_domain, util_domain); } static inline uint8_t rxr_ep_mr_local(struct rxr_ep *ep) { struct rxr_domain *domain = container_of(ep->util_ep.domain, struct rxr_domain, util_domain); return domain->mr_local; } /* * today we have only cq res check, in future we will have ctx, and other * resource check as well. */ static inline uint64_t is_tx_res_full(struct rxr_ep *ep) { return ep->rm_full & RXR_RM_TX_CQ_FULL; } static inline uint64_t is_rx_res_full(struct rxr_ep *ep) { return ep->rm_full & RXR_RM_RX_CQ_FULL; } static inline void rxr_rm_rx_cq_check(struct rxr_ep *ep, struct util_cq *rx_cq) { fastlock_acquire(&rx_cq->cq_lock); if (ofi_cirque_isfull(rx_cq->cirq)) ep->rm_full |= RXR_RM_RX_CQ_FULL; else ep->rm_full &= ~RXR_RM_RX_CQ_FULL; fastlock_release(&rx_cq->cq_lock); } static inline void rxr_rm_tx_cq_check(struct rxr_ep *ep, struct util_cq *tx_cq) { fastlock_acquire(&tx_cq->cq_lock); if (ofi_cirque_isfull(tx_cq->cirq)) ep->rm_full |= RXR_RM_TX_CQ_FULL; else ep->rm_full &= ~RXR_RM_TX_CQ_FULL; fastlock_release(&tx_cq->cq_lock); } static inline bool rxr_peer_timeout_expired(struct rxr_ep *ep, struct rxr_peer *peer, uint64_t ts) { return (ts >= (peer->rnr_ts + MIN(rxr_env.max_timeout, peer->timeout_interval * (1 << peer->rnr_timeout_exp)))); } /* Performance counter declarations */ #ifdef RXR_PERF_ENABLED #define RXR_PERF_FOREACH(DECL) \ DECL(perf_rxr_tx), \ DECL(perf_rxr_recv), \ DECL(rxr_perf_size) \ enum rxr_perf_counters { RXR_PERF_FOREACH(OFI_ENUM_VAL) }; extern const char *rxr_perf_counters_str[]; static inline void rxr_perfset_start(struct rxr_ep *ep, size_t index) { struct rxr_domain *domain = rxr_ep_domain(ep); struct rxr_fabric *fabric = container_of(domain->util_domain.fabric, struct rxr_fabric, util_fabric); ofi_perfset_start(&fabric->perf_set, index); } static inline void rxr_perfset_end(struct rxr_ep *ep, size_t index) { struct rxr_domain *domain = rxr_ep_domain(ep); struct rxr_fabric *fabric = container_of(domain->util_domain.fabric, struct rxr_fabric, util_fabric); ofi_perfset_end(&fabric->perf_set, index); } #else #define rxr_perfset_start(ep, index) do {} while (0) #define rxr_perfset_end(ep, index) do {} while (0) #endif #endif