/* * Copyright (C) 2016 by Argonne National Laboratory. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the * BSD license below: * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * - Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * - Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #ifndef _FI_BGQ_DIRECT_EP_H_ #define _FI_BGQ_DIRECT_EP_H_ #define FABRIC_DIRECT_ENDPOINT 1 #include #include #include #include "rdma/bgq/fi_bgq_compiler.h" #include "rdma/bgq/fi_bgq_hwi.h" #include "rdma/bgq/fi_bgq_spi.h" #include "rdma/fi_direct_eq.h" #include "rdma/bgq/fi_bgq_mu.h" #include "rdma/bgq/fi_bgq_progress.h" #include "rdma/bgq/fi_bgq_flight_recorder.h" #define FI_BGQ_L2FIFO_CTL_SIZE (1024) #define IS_TAG (0) #define IS_MSG (1) // #define FI_BGQ_TRACE 1 // #define FI_BGQ_REMOTE_COMPLETION enum fi_bgq_ep_state { FI_BGQ_EP_UNINITIALIZED = 0, FI_BGQ_EP_INITITALIZED_DISABLED, FI_BGQ_EP_INITITALIZED_ENABLED }; struct fi_bgq_stx { struct fid_stx stx_fid; /* 80 bytes */ struct fi_bgq_domain *domain; struct fi_tx_attr attr; struct fi_bgq_spi_injfifo injfifo; struct fi_bgq_spi_injfifo rgetfifo; MUSPI_InjFifoSubGroup_t injfifo_subgroup; MUSPI_InjFifoSubGroup_t rgetfifo_subgroup; struct l2atomic_counter ref_cnt; struct l2atomic_lock lock; }; struct rx_operation { MUSPI_RecFifo_t *rfifo; struct l2atomic_fifo fifo; struct { union fi_bgq_context *head; union fi_bgq_context *tail; } mq; struct { struct fi_bgq_mu_packet *head; struct fi_bgq_mu_packet *tail; struct fi_bgq_mu_packet *free; } ue; }; struct fi_bgq_ep_tx { struct fi_bgq_spi_injfifo injfifo; /* cloned from stx; 88 bytes */ uint64_t unused0[5]; /* == L2 CACHE LINE == */ uint64_t op_flags; struct fi_bgq_cq *send_cq; struct fi_bgq_cntr *send_cntr; struct fi_bgq_cntr *write_cntr; uint64_t unused[12]; /* == L2 CACHE LINE == */ struct { /* three l2 cache lines */ MUHWI_Descriptor_t send_model; MUHWI_Descriptor_t local_completion_model; /* only "local completion eager" */ MUHWI_Descriptor_t rzv_model[2]; /* [0]=="internode"; [1]=="intranode" */ MUHWI_Descriptor_t remote_completion_model; uint8_t unused[64]; } send __attribute((aligned(L2_CACHE_LINE_SIZE))); /* == L2 CACHE LINE == */ struct { struct { MUHWI_Descriptor_t rget_model; MUHWI_Descriptor_t dput_model; } direct; /* == L2 CACHE LINE == */ struct { MUHWI_Descriptor_t mfifo_model; MUHWI_Descriptor_t dput_model; } emulation; /* == L2 CACHE LINE == */ MUHWI_Descriptor_t cntr_model; MUHWI_Descriptor_t cq_model; /* == L2 CACHE LINE == */ uint64_t global_one_paddr; uint64_t global_zero_paddr; uint8_t unused[48]; } read __attribute((aligned(L2_CACHE_LINE_SIZE))); /* == L2 CACHE LINE == */ struct { struct { MUHWI_Descriptor_t dput_model; MUHWI_Descriptor_t unused; } direct; /* == L2 CACHE LINE == */ struct { MUHWI_Descriptor_t mfifo_model; MUHWI_Descriptor_t rget_model; /* == L2 CACHE LINE == */ MUHWI_Descriptor_t dput_model; MUHWI_Descriptor_t cntr_model; } emulation; } write __attribute((aligned(L2_CACHE_LINE_SIZE))); /* == L2 CACHE LINE == */ struct { struct { MUHWI_Descriptor_t mfifo_model; MUHWI_Descriptor_t unused; /* == L2 CACHE LINE == */ struct { MUHWI_Descriptor_t mfifo_model; MUHWI_Descriptor_t cntr_model; /* == L2 CACHE LINE == */ MUHWI_Descriptor_t cq_model; MUHWI_Descriptor_t unused; } fence; } emulation; } atomic __attribute((aligned(L2_CACHE_LINE_SIZE))); /* == L2 CACHE LINE == */ uint64_t caps; uint64_t mode; enum fi_bgq_ep_state state; struct fi_bgq_stx *stx; struct fi_bgq_stx exclusive_stx; ssize_t index; } __attribute((aligned(L2_CACHE_LINE_SIZE))); struct fi_bgq_ep_rx { /* == L2 CACHE LINE == */ /* 'post' data is used when a thread is INITIATING recv operations */ struct { uint64_t op_flags; struct l2atomic_fifo_producer match[2]; /* 0 == 'tag', 1 == 'msg' */ /* TODO - use an enum */ struct l2atomic_fifo_producer control; } post __attribute((aligned(L2_CACHE_LINE_SIZE))); /* == L2 CACHE LINE == */ /* 'poll' data is used when a thread is making PROGRESS on recv operations */ struct { MUSPI_RecFifo_t *muspi_recfifo; uintptr_t pad; /* 'rfifo' data is used to poll a mu reception fifo */ struct { struct l2atomic_fifo_consumer match; struct { union fi_bgq_context *head; union fi_bgq_context *tail; } mq; struct { struct fi_bgq_mu_packet *head; struct fi_bgq_mu_packet *tail; struct fi_bgq_mu_packet *free; } ue; } rfifo[2]; /* 0 == 'tag', 1 == 'msg' */ struct fi_bgq_spi_injfifo injfifo; /* == L2 CACHE LINE == */ struct { MUHWI_Descriptor_t rget_model[2]; /* [0]=="internode"; [1]=="intranode" */ /* == L2 CACHE LINE == */ MUHWI_Descriptor_t dput_model[2]; /* [0]=="internode"; [1]=="intranode" */ /* == L2 CACHE LINE == */ MUHWI_Descriptor_t dput_completion_model; MUHWI_Descriptor_t multi_recv_ack_model; } rzv __attribute((aligned(L2_CACHE_LINE_SIZE))); /* TODO reorganize rzv for better cache layout */ /* == L2 CACHE LINE == */ /* TODO reorganize ack (with rzv) for better cache layout */ MUHWI_Descriptor_t ack_model[2]; /* [0]=="internode"; [1]=="intranode" */ /* == L2 CACHE LINE == */ MUHWI_Descriptor_t atomic_dput_model; struct fi_bgq_bat_entry *bat; uint64_t pad_1[7]; /* == L2 CACHE LINE == */ MUHWI_Descriptor_t atomic_cntr_update_model[2]; /* [0]=="internode"; [1]=="intranode" */ /* == L2 CACHE LINE == */ struct fi_bgq_cq *recv_cq; struct fi_bgq_cntr *write_cntr; uint64_t min_multi_recv; /* -- non-critical -- */ struct fi_bgq_domain *domain; struct l2atomic_fifo_consumer control; MUSPI_InjFifoSubGroup_t injfifo_subgroup; } poll __attribute((aligned(L2_CACHE_LINE_SIZE))); uint64_t caps; uint64_t mode; uint64_t op_flags; size_t total_buffered_recv; /* TODO - is this only used by receive operations? */ struct fi_bgq_sep *sep; struct fi_bgq_ep *srx; union fi_bgq_addr self; /* -- non-critical -- */ ssize_t index; uint64_t min_multi_recv; void *l2atomic_memptr; enum fi_bgq_ep_state state; } __attribute((aligned(L2_CACHE_LINE_SIZE))); /* * The 'fi_bgq_ep' struct defines an endpoint with a single tx context and a * single rx context. The tx context is only valid if the FI_READ, FI_WRITE, * or FI_SEND capability is specified. The rx context is only valid if the * FI_RECV, FI_REMOTE_READ, or FI_REMOTE_WRITE flags are specified. * * A 'scalable tx context' is simply an endpoint structure with only the * tx flags specified, and a 'scalable rx context' is simply an endpoint * structure with only the rx flags specified. * * As such, multiple OFI 'classes' share this endpoint structure: * FI_CLASS_EP * FI_CLASS_TX_CTX * --- no FI_CLASS_STX_CTX * FI_CLASS_RX_CTX * -- no FI_CLASS_SRX_CTX */ struct fi_bgq_ep { struct fid_ep ep_fid; /* 80 bytes */ struct l2atomic_lock lock; /* 16 bytes */ uint32_t threading; uint32_t av_type; uint32_t mr_mode; uint8_t unused[20]; /* == L2 CACHE LINE == */ struct fi_bgq_ep_tx tx; struct fi_bgq_ep_rx rx; struct fi_bgq_cntr *read_cntr; struct fi_bgq_cntr *write_cntr; struct fi_bgq_cntr *send_cntr; struct fi_bgq_cntr *recv_cntr; struct fi_bgq_cq *send_cq; struct fi_bgq_cq *recv_cq; struct fi_bgq_domain *domain; void *mem; struct fi_bgq_av *av; struct fi_bgq_sep *sep; struct { volatile uint64_t enabled; volatile uint64_t active; pthread_t thread; } async; enum fi_bgq_ep_state state; } __attribute((aligned(L2_CACHE_LINE_SIZE))); /* * A 'scalable endpoint' may not be directly specified in a data movement * functions, such as fi_tsend(), as it is only a container for multiple * tx and rx contexts. * * The scalable contexts share certain resources, such as the address vector. */ struct fi_bgq_sep { struct fid_ep ep_fid; struct fi_bgq_domain *domain; struct fi_bgq_av *av; struct fi_info *info; void *memptr; struct l2atomic_counter ref_cnt; } __attribute((aligned(L2_CACHE_LINE_SIZE))); void fi_bgq_ep_progress_manual_cancel (struct fi_bgq_ep * bgq_ep, const uint64_t cancel_context); int fi_bgq_ep_progress_manual_recv (struct fi_bgq_ep *bgq_ep, const uint64_t is_msg, union fi_bgq_context * context, const uint64_t rx_op_flags, const uint64_t is_context_ext); /* See: process_mfifo_context() */ int fi_bgq_ep_progress_manual_recv_fast (struct fi_bgq_ep *bgq_ep, const uint64_t is_msg, union fi_bgq_context * context); int fi_bgq_ep_progress_manual (struct fi_bgq_ep *bgq_ep); static inline int fi_bgq_check_endpoint(struct fi_bgq_ep *bgq_ep, enum fi_av_type av_type) { #ifdef DEBUG if (!bgq_ep) return -FI_EINVAL; if (bgq_ep->state != FI_BGQ_EP_ENABLED) return -FI_EINVAL; if (av_type == FI_AV_UNSPEC) return -FI_EINVAL; if (av_type == FI_AV_MAP && bgq_ep->av_type != FI_AV_MAP) return -FI_EINVAL; if (av_type == FI_AV_TABLE && bgq_ep->av_type != FI_AV_TABLE) return -FI_EINVAL; /* currently, only FI_AV_MAP is supported */ if (av_type == FI_AV_TABLE) { return -FI_ENOSYS; } else if (av_type != FI_AV_MAP) { return -FI_EINVAL; } #endif return 0; } static inline int fi_bgq_ep_tx_check(struct fi_bgq_ep_tx * tx, enum fi_av_type av_type) { #ifdef DEBUG if (!tx) return -FI_EINVAL; if (tx->state != FI_BGQ_TX_ENABLED) return -FI_EINVAL; if (av_type == FI_AV_UNSPEC) return -FI_EINVAL; if (av_type == FI_AV_MAP && tx->av_type != FI_MAP) return -FI_EINVAL; if (av_type == FI_AV_TABLE && tx->av_type != FI_TABLE) return -FI_EINVAL; /* currently, only FI_AV_MAP is supported */ if (av_type == FI_AV_TABLE) return -FI_ENOSYS; if (av_type != FI_AV_MAP) return -FI_EINVAL; #endif return 0; } /* * NOTE-CACHE: This code touches 4 cachelines in the code path - not including * cachelines used for the actual injection fifo descriptor and the hardware * SRAM - when the message length <= 8 bytes. * * NOTE-CACHE: This code touches 5 cachelines in the code path - not including * cachelines used for the actual injection fifo descriptor, the hardware SRAM, * and the immediate payload - when the message length > 8 bytes. * * TODO - reorganize fi_bgq structures and implement a slimmed down version of * MUSPI_InjFifoAdvanceDesc to reduce cache pressure. The "<=8" code path * should only need to touch 2 cachelines and the ">8" code path should only * need to touch 3 cachelines. */ static inline ssize_t fi_bgq_inject_generic(struct fid_ep *ep, const void *buf, size_t len, fi_addr_t dest_addr, uint64_t tag, const uint32_t data, int lock_required, const unsigned is_msg) { assert(is_msg == 0 || is_msg == 1); struct fi_bgq_ep *bgq_ep = container_of(ep, struct fi_bgq_ep, ep_fid); ssize_t ret; ret = fi_bgq_ep_tx_check(&bgq_ep->tx, FI_BGQ_FABRIC_DIRECT_AV); if (ret) return ret; /* TODO - if this is a FI_CLASS_STX_CTX, then the lock is required */ ret = fi_bgq_lock_if_required(&bgq_ep->lock, lock_required); if (ret) return ret; /* get the destination bgq torus address */ const union fi_bgq_addr bgq_dst_addr = {.fi=dest_addr}; /* eager with lookaside payload buffer and no completion */ /* busy-wait until a fifo slot is available ... */ MUHWI_Descriptor_t * send_desc = fi_bgq_spi_injfifo_tail_wait(&bgq_ep->tx.injfifo); /* copy the descriptor model into the injection fifo */ qpx_memcpy64((void*)send_desc, (const void*)&bgq_ep->tx.send.send_model); /* set the destination torus address and fifo map */ send_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(bgq_dst_addr.uid.fi); send_desc->Torus_FIFO_Map = fi_bgq_addr_get_fifo_map(dest_addr); send_desc->PacketHeader.messageUnitHeader.Packet_Types.Memory_FIFO.Rec_FIFO_Id = fi_bgq_addr_rec_fifo_id(dest_addr); union fi_bgq_mu_packet_hdr * hdr = (union fi_bgq_mu_packet_hdr *) &send_desc->PacketHeader; if (is_msg) { fi_bgq_mu_packet_type_set(hdr, FI_BGQ_MU_PACKET_TYPE_EAGER); } /* locate the payload lookaside slot */ uint64_t payload_paddr = 0; void *payload_vaddr = fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->tx.injfifo, send_desc, &payload_paddr); send_desc->Pa_Payload = payload_paddr; send_desc->Message_Length = len; if (len) memcpy(payload_vaddr, buf, len); /* TODO use a qpx-optimized memcpy instead */ hdr->pt2pt.send.message_length = len; hdr->pt2pt.ofi_tag = tag; hdr->pt2pt.immediate_data = data; #ifdef FI_BGQ_TRACE fprintf(stderr,"fi_bgq_inject_generic dest addr is:\n"); FI_BGQ_ADDR_DUMP((fi_addr_t *)&dest_addr); #endif MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo); /* TODO - if this is a FI_CLASS_STX_CTX, then the lock is required */ ret = fi_bgq_unlock_if_required(&bgq_ep->lock, lock_required); if (ret) return ret; return 0; } static inline ssize_t fi_bgq_send_generic_flags(struct fid_ep *ep, const void *buf, size_t len, void *desc, fi_addr_t dest_addr, uint64_t tag, void *context, const uint32_t data, int lock_required, const unsigned is_msg, const unsigned is_contiguous, const unsigned override_flags, uint64_t tx_op_flags) { #ifdef FI_BGQ_TRACE fprintf(stderr,"fi_bgq_send_generic_flags starting\n"); #endif assert(is_msg == 0 || is_msg == 1); assert(is_contiguous == 0 || is_contiguous == 1); struct fi_bgq_ep *bgq_ep = container_of(ep, struct fi_bgq_ep, ep_fid); ssize_t ret; ret = fi_bgq_ep_tx_check(&bgq_ep->tx, FI_BGQ_FABRIC_DIRECT_AV); if (ret) return ret; /* TODO - if this is a FI_CLASS_STX_CTX, then the lock is required */ ret = fi_bgq_lock_if_required(&bgq_ep->lock, lock_required); if (ret) return ret; /* get the destination bgq torus address */ const union fi_bgq_addr bgq_dst_addr = {.fi=dest_addr}; size_t xfer_len = 0; if (is_contiguous) xfer_len = len; else { size_t i; const struct iovec * iov = (const struct iovec *)buf; for (i=0; itx.op_flags; /* busy-wait until a fifo slot is available .. */ MUHWI_Descriptor_t * send_desc = fi_bgq_spi_injfifo_tail_wait(&bgq_ep->tx.injfifo); if (xfer_len <= FI_BGQ_TOTAL_BUFFERED_RECV) { /* eager */ /* copy the descriptor model into the injection fifo */ qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.send_model); /* set the destination torus address and fifo map */ send_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(bgq_dst_addr.uid.fi); send_desc->Torus_FIFO_Map = fi_bgq_addr_get_fifo_map(dest_addr); send_desc->Message_Length = xfer_len; send_desc->PacketHeader.messageUnitHeader.Packet_Types.Memory_FIFO.Rec_FIFO_Id = fi_bgq_addr_rec_fifo_id(dest_addr); if (is_contiguous && ((tx_op_flags & FI_INJECT) == 0)) { fi_bgq_cnk_vaddr2paddr(buf, len, &send_desc->Pa_Payload); } else { /* locate the payload lookaside slot */ uint64_t payload_paddr = 0; uintptr_t payload_vaddr = (uintptr_t) fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->tx.injfifo, send_desc, &payload_paddr); send_desc->Pa_Payload = payload_paddr; if (is_contiguous) { if (len) memcpy((void*)payload_vaddr, buf, len); } else { unsigned i; const struct iovec * iov = (const struct iovec *)buf; for (i=0; iPacketHeader; hdr->pt2pt.send.message_length = xfer_len; hdr->pt2pt.ofi_tag = tag; hdr->pt2pt.immediate_data = data; #ifdef FI_BGQ_TRACE fprintf(stderr,"eager sending to dest:\n"); FI_BGQ_ADDR_DUMP(&dest_addr); #endif if (is_msg) { fi_bgq_mu_packet_type_set(hdr, FI_BGQ_MU_PACKET_TYPE_EAGER); /* clear the 'TAG' bit in the packet type */ } MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo); #ifdef FI_BGQ_REMOTE_COMPLETION if (tx_op_flags & (FI_TRANSMIT_COMPLETE | FI_DELIVERY_COMPLETE)) { /* * TODO - this code is buggy and results in a hang at job completion for 'cpi' * * Suspect that remote processes are exiting before the 'request for ack' * remote completion packet is received, then the process that issued the * 'request for ack' messagee will hang because the ack is never received. * * Alternative implementations: * 1. Do not support remote completions on bgq (current) * 2. Support remote completions via rendezvous protocol */ /* inject the 'remote completion' descriptor */ send_desc = fi_bgq_spi_injfifo_tail_wait(&bgq_ep->tx.injfifo); /* copy the descriptor model into the injection fifo */ qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.remote_completion_model); /* initialize the completion entry */ assert(context); assert(((uintptr_t)context & 0x07ull) == 0); /* must be 8 byte aligned */ union fi_bgq_context * bgq_context = (union fi_bgq_context *)context; bgq_context->flags = 0; /* TODO */ bgq_context->len = xfer_len; bgq_context->buf = NULL; /* TODO */ bgq_context->byte_counter = xfer_len; bgq_context->tag = tag; uint64_t byte_counter_paddr = 0; fi_bgq_cnk_vaddr2paddr((const void*)&bgq_context->byte_counter, sizeof(uint64_t), &byte_counter_paddr); /* set the destination torus address and fifo map */ send_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(bgq_dst_addr.uid.fi); send_desc->Torus_FIFO_Map = (uint64_t) bgq_dst_addr.fifo_map; send_desc->PacketHeader.messageUnitHeader.Packet_Types.Memory_FIFO.Rec_FIFO_Id = fi_bgq_addr_rec_fifo_id(dest_addr); hdr = (union fi_bgq_mu_packet_hdr *) &send_desc->PacketHeader; hdr->completion.is_local = fi_bgq_addr_is_local(dest_addr); hdr->completion.cntr_paddr_rsh3b = byte_counter_paddr >> 3; fi_bgq_cq_enqueue_pending(bgq_ep->send_cq, bgq_context, lock_required); MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo); } else #endif { if (tx_op_flags & (FI_INJECT_COMPLETE | FI_TRANSMIT_COMPLETE | FI_DELIVERY_COMPLETE)) { #ifdef FI_BGQ_TRACE fprintf(stderr,"eager injecting local completion dput\n"); #endif /* inject the 'local completion' direct put descriptor */ send_desc = fi_bgq_spi_injfifo_tail_wait(&bgq_ep->tx.injfifo); /* copy the descriptor model into the injection fifo */ qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.local_completion_model); /* initialize the completion entry */ assert(context); assert(((uintptr_t)context & 0x07ull) == 0); /* must be 8 byte aligned */ union fi_bgq_context * bgq_context = (union fi_bgq_context *)context; bgq_context->flags = 0; /* TODO */ bgq_context->len = xfer_len; bgq_context->buf = NULL; /* TODO */ bgq_context->byte_counter = xfer_len; bgq_context->tag = tag; uint64_t byte_counter_paddr = 0; fi_bgq_cnk_vaddr2paddr((const void*)&bgq_context->byte_counter, sizeof(uint64_t), &byte_counter_paddr); send_desc->Pa_Payload = MUSPI_GetAtomicAddress(byte_counter_paddr, MUHWI_ATOMIC_OPCODE_LOAD_CLEAR); fi_bgq_cq_enqueue_pending(bgq_ep->send_cq, bgq_context, lock_required); MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo); } } } else { /* rendezvous */ #ifdef FI_BGQ_TRACE fprintf(stderr,"rendezvous sending to dest:\n"); FI_BGQ_ADDR_DUMP(&dest_addr); #endif assert((tx_op_flags & FI_INJECT) == 0); const uint64_t is_local = fi_bgq_addr_is_local(dest_addr); /* copy the descriptor model into the injection fifo */ qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.rzv_model[is_local]); /* set the destination torus address and fifo map */ send_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(bgq_dst_addr.uid.fi); send_desc->Torus_FIFO_Map = fi_bgq_addr_get_fifo_map(dest_addr); send_desc->PacketHeader.messageUnitHeader.Packet_Types.Memory_FIFO.Rec_FIFO_Id = fi_bgq_addr_rec_fifo_id(dest_addr); union fi_bgq_mu_packet_hdr * hdr = (union fi_bgq_mu_packet_hdr *) &send_desc->PacketHeader; if (is_msg) { fi_bgq_mu_packet_type_set(hdr, FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS); } /* locate the payload lookaside slot */ uint64_t payload_paddr = 0; union fi_bgq_mu_packet_payload *payload = (union fi_bgq_mu_packet_payload *) fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->tx.injfifo, send_desc, &payload_paddr); send_desc->Pa_Payload = payload_paddr; payload->rendezvous.fifo_map = fi_bgq_addr_get_fifo_map(bgq_dst_addr.fi); if (is_contiguous) { /* only send one mu iov */ fi_bgq_cnk_vaddr2paddr(buf, len, &payload->rendezvous.mu_iov[0].src_paddr); payload->rendezvous.mu_iov[0].message_length = len; hdr->pt2pt.rendezvous.niov_minus_1 = 0; } else { assert(len <= 31); size_t i; const struct iovec * iov = (const struct iovec *)buf; send_desc->Message_Length += (len-1) * sizeof(struct fi_bgq_mu_iov); for (i=0; irendezvous.mu_iov[i].src_paddr); payload->rendezvous.mu_iov[i].message_length = iov[i].iov_len; } hdr->pt2pt.rendezvous.niov_minus_1 = len - 1; } /* initialize the completion entry */ assert(context); assert(((uintptr_t)context & 0x07ull) == 0); /* must be 8 byte aligned */ union fi_bgq_context * bgq_context = (union fi_bgq_context *)context; bgq_context->flags = 0; /* TODO */ bgq_context->len = xfer_len; bgq_context->buf = NULL; /* TODO */ bgq_context->byte_counter = xfer_len; bgq_context->tag = tag; uint64_t byte_counter_paddr = 0; fi_bgq_cnk_vaddr2paddr((const void*)&bgq_context->byte_counter, sizeof(uint64_t), &byte_counter_paddr); payload->rendezvous.cntr_paddr_rsh3b = byte_counter_paddr >> 3; hdr->pt2pt.ofi_tag = tag; hdr->pt2pt.immediate_data = data; MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo); fi_bgq_cq_enqueue_pending(bgq_ep->send_cq, bgq_context, lock_required); } /* TODO - if this is a FI_CLASS_STX_CTX, then the lock is required */ ret = fi_bgq_unlock_if_required(&bgq_ep->lock, lock_required); if (ret) return ret; return 0; } static inline ssize_t fi_bgq_send_generic(struct fid_ep *ep, const void *buf, size_t len, void *desc, fi_addr_t dst_addr, uint64_t tag, void *context, int lock_required, const unsigned is_msg) { assert(is_msg == 0 || is_msg == 1); return fi_bgq_send_generic_flags(ep, buf, len, desc, dst_addr, tag, context, 0, lock_required, is_msg, 1 /* is_contiguous */, 0 /* do not override flags */, 0 /* no flags */); } /* * In FI_PROGRESS_MANUAL mode: * The bgq 'recv' implementation is THREAD SAFE and LOCKLESS due to its use of * the L2 atomic operations to post the match information to the progress thread. * The 'fi_bgq_lock_if_required()' utility function is not used. */ static inline ssize_t fi_bgq_recv_generic(struct fid_ep *ep, void *buf, size_t len, void *desc, fi_addr_t src_addr, uint64_t tag, uint64_t ignore, void *context, const int lock_required, const uint64_t is_msg) { assert(is_msg == 0 || is_msg == 1); struct fi_bgq_ep *bgq_ep = container_of(ep, struct fi_bgq_ep, ep_fid); const uint64_t rx_op_flags = bgq_ep->rx.post.op_flags; assert(context); assert(((uintptr_t)context & 0x07ull) == 0); /* must be 8 byte aligned */ union fi_bgq_context * bgq_context = (union fi_bgq_context *)context; bgq_context->flags = rx_op_flags; bgq_context->len = len; bgq_context->buf = buf; bgq_context->src_addr = src_addr; #ifdef FI_BGQ_TRACE fprintf(stderr,"fi_bgq_recv_generic from source addr:\n"); FI_BGQ_ADDR_DUMP(&bgq_context->src_addr); #endif bgq_context->tag = tag; bgq_context->ignore = ignore; bgq_context->byte_counter = (uint64_t)-1; if (FI_BGQ_FABRIC_DIRECT_PROGRESS == FI_PROGRESS_MANUAL) { /* constant expression will compile out */ int ret; ret = fi_bgq_lock_if_required(&bgq_ep->lock, lock_required); if (ret) return ret; #ifdef FI_BGQ_TRACE fprintf(stderr,"fi_bgq_recv_generic calling fi_bgq_ep_progress_manual_recv_fast:\n"); #endif fi_bgq_ep_progress_manual_recv_fast(bgq_ep, is_msg, context); ret = fi_bgq_unlock_if_required(&bgq_ep->lock, lock_required); if (ret) return ret; } else { /* the *only* difference between a 'tagged' and 'non-tagged' recv is * the L2 atomic fifo used to post the receive information */ struct l2atomic_fifo_producer * fifo = &bgq_ep->rx.post.match[is_msg]; uint64_t context_rsh3b = (uint64_t)context >> 3; while (l2atomic_fifo_produce(fifo, context_rsh3b) != 0); /* spin loop! */ } return 0; } /* * In FI_PROGRESS_AUTO mode: * The bgq 'recv' implementation is THREAD SAFE and LOCKLESS due to its use of * the L2 atomic operations to post the match information to the progress thread. * The 'fi_bgq_lock_if_required()' utility function is not used. * * \note The bgq provider asserts the following mode bits which affect * the behavior of this routine: * * - 'FI_ASYNC_IOV' mode bit which requires the application to maintain * the 'msg->msg_iov' iovec array until the operation completes * * - 'FI_LOCAL_MR' mode bit which allows the provider to ignore the 'desc' * parameter .. no memory regions are required to access the local * memory */ static inline ssize_t fi_bgq_recvmsg_generic(struct fid_ep *ep, const struct fi_msg *msg, uint64_t flags, int lock_required) { struct fi_bgq_ep *bgq_ep = container_of(ep, struct fi_bgq_ep, ep_fid); uint64_t context_rsh3b = 0; uint64_t rx_op_flags = 0; if (flags | FI_MULTI_RECV) { assert(msg->context); assert(((uintptr_t)msg->context & 0x07ull) == 0); /* must be 8 byte aligned */ union fi_bgq_context * bgq_context = (union fi_bgq_context *) msg->context; uint64_t len = msg->msg_iov[0].iov_len; void * base = msg->msg_iov[0].iov_base; assert(msg->iov_count == 1); assert(base != NULL); if ((uintptr_t)base & 0x07ull) { uintptr_t new_base = (((uintptr_t)base + 8) & (~0x07ull)); len -= (new_base - (uintptr_t)base); base = (void *)new_base; } assert(((uintptr_t)base & 0x07ull) == 0); assert(len >= (sizeof(union fi_bgq_context) + bgq_ep->rx.min_multi_recv)); bgq_context->flags = FI_MULTI_RECV; bgq_context->len = len - sizeof(union fi_bgq_context); bgq_context->buf = (void *)((uintptr_t)base + sizeof(union fi_bgq_context)); bgq_context->src_addr = msg->addr; bgq_context->byte_counter = 0; bgq_context->multi_recv_next = (union fi_bgq_context *)base; bgq_context->ignore = (uint64_t)-1; context_rsh3b = (uint64_t)bgq_context >> 3; rx_op_flags = flags; } else if (msg->iov_count == 0) { assert(msg->context); assert(((uintptr_t)msg->context & 0x07ull) == 0); /* must be 8 byte aligned */ union fi_bgq_context * bgq_context = (union fi_bgq_context *) msg->context; bgq_context->flags = flags; bgq_context->len = 0; bgq_context->buf = NULL; bgq_context->src_addr = msg->addr; bgq_context->tag = 0; bgq_context->ignore = (uint64_t)-1; bgq_context->byte_counter = (uint64_t)-1; context_rsh3b = (uint64_t)bgq_context >> 3; rx_op_flags = flags; } else if (msg->iov_count == 1) { assert(msg->context); assert(((uintptr_t)msg->context & 0x07ull) == 0); /* must be 8 byte aligned */ union fi_bgq_context * bgq_context = (union fi_bgq_context *) msg->context; bgq_context->flags = flags; bgq_context->len = msg->msg_iov[0].iov_len; bgq_context->buf = msg->msg_iov[0].iov_base; bgq_context->src_addr = msg->addr; bgq_context->tag = 0; bgq_context->ignore = (uint64_t)-1; bgq_context->byte_counter = (uint64_t)-1; context_rsh3b = (uint64_t)bgq_context >> 3; rx_op_flags = flags; } else { struct fi_bgq_context_ext * ext; posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext)); ext->bgq_context.flags = flags | FI_BGQ_CQ_CONTEXT_EXT; ext->bgq_context.byte_counter = (uint64_t)-1; ext->bgq_context.src_addr = msg->addr; ext->bgq_context.tag = 0; ext->bgq_context.ignore = (uint64_t)-1; ext->msg.op_context = (struct fi_context *)msg->context; ext->msg.iov_count = msg->iov_count; ext->msg.iov = (struct iovec *)msg->msg_iov; context_rsh3b = (uint64_t)ext >> 3; rx_op_flags = flags | FI_BGQ_CQ_CONTEXT_EXT; if (FI_BGQ_FABRIC_DIRECT_PROGRESS == FI_PROGRESS_MANUAL) { /* constant expression will compile out */ int ret; ret = fi_bgq_lock_if_required(&bgq_ep->lock, lock_required); if (ret) return ret; fi_bgq_ep_progress_manual_recv(bgq_ep, 1, /* is_msg */ (union fi_bgq_context *)(context_rsh3b << 3), rx_op_flags, 1 /* is_context_ext */); ret = fi_bgq_unlock_if_required(&bgq_ep->lock, lock_required); if (ret) return ret; return 0; } } if (FI_BGQ_FABRIC_DIRECT_PROGRESS == FI_PROGRESS_MANUAL) { /* constant expression will compile out */ int ret; ret = fi_bgq_lock_if_required(&bgq_ep->lock, lock_required); if (ret) return ret; fi_bgq_ep_progress_manual_recv(bgq_ep, 1, /* is_msg */ (union fi_bgq_context *)(context_rsh3b << 3), rx_op_flags, 0 /* is_context_ext */); ret = fi_bgq_unlock_if_required(&bgq_ep->lock, lock_required); if (ret) return ret; } else { /* the *only* difference between a 'tagged' and 'non-tagged' recv is * the L2 atomic fifo used to post the receive information */ struct l2atomic_fifo_producer * fifo = &bgq_ep->rx.post.match[1]; /* TODO - use enum */ while (l2atomic_fifo_produce(fifo, context_rsh3b) != 0); /* spin loop! */ } return 0; } static inline ssize_t fi_bgq_injectdata_generic(struct fid_ep *ep, const void *buf, size_t len, uint64_t data, fi_addr_t dst_addr, uint64_t tag, int lock_required, const unsigned is_msg) { return fi_bgq_inject_generic(ep, buf, len, dst_addr, tag, data, lock_required, is_msg); } static inline ssize_t fi_bgq_senddata_generic(struct fid_ep *ep, const void *buf, size_t len, void *desc, uint64_t data, fi_addr_t dst_addr, uint64_t tag, void *context, int lock_required, const unsigned is_msg) { assert(is_msg == 0 || is_msg == 1); return fi_bgq_send_generic_flags(ep, buf, len, desc, dst_addr, tag, context, data, lock_required, is_msg, 1 /* is_contiguous */, 0 /* do not override flags */, 0 /* no flags */); } static inline ssize_t fi_bgq_inject(struct fid_ep *ep, const void *buf, size_t len, fi_addr_t dest_addr, int lock_required) { return fi_bgq_inject_generic(ep, buf, len, dest_addr, 0, 0, lock_required, 1); } /* * Declare specialized functions that qualify for FABRIC_DIRECT. * - No locks */ #define FI_BGQ_MSG_FABRIC_DIRECT_LOCK 0 FI_BGQ_MSG_SPECIALIZED_FUNC(FI_BGQ_MSG_FABRIC_DIRECT_LOCK) #ifdef FABRIC_DIRECT #define fi_send(ep, buf, len, desc, dest_addr, context) \ (FI_BGQ_MSG_SPECIALIZED_FUNC_NAME(send, \ FI_BGQ_MSG_FABRIC_DIRECT_LOCK) \ (ep, buf, len, desc, dest_addr, context)) #define fi_recv(ep, buf, len, desc, src_addr, context) \ (FI_BGQ_MSG_SPECIALIZED_FUNC_NAME(recv, \ FI_BGQ_MSG_FABRIC_DIRECT_LOCK) \ (ep, buf, len, desc, src_addr, context)) #define fi_inject(ep, buf, len, dest_addr) \ (FI_BGQ_MSG_SPECIALIZED_FUNC_NAME(inject, \ FI_BGQ_MSG_FABRIC_DIRECT_LOCK) \ (ep, buf, len, dest_addr)) #define fi_recvmsg(ep, msg, flags) \ (FI_BGQ_MSG_SPECIALIZED_FUNC_NAME(recvmsg, \ FI_BGQ_MSG_FABRIC_DIRECT_LOCK) \ (ep, msg, flags)) #define fi_senddata(ep, buf, len, desc, data, dest_addr, context) \ (FI_BGQ_MSG_SPECIALIZED_FUNC_NAME(senddata, \ FI_BGQ_MSG_FABRIC_DIRECT_LOCK) \ (ep, buf, len, desc, data, dest_addr, context)) #define fi_injectdata(ep, buf, len, data, dest_addr) \ (FI_BGQ_MSG_SPECIALIZED_FUNC_NAME(injectdata, \ FI_BGQ_MSG_FABRIC_DIRECT_LOCK) \ (ep, buf, len, data, dest_addr)) static inline ssize_t fi_sendmsg(struct fid_ep *ep, const struct fi_msg *msg, uint64_t flags) { return ep->msg->sendmsg(ep, msg, flags); } static inline ssize_t fi_sendv(struct fid_ep *ep, const struct iovec *iov, void **desc, size_t count, fi_addr_t dest_addr, void *context) { return ep->msg->sendv(ep, iov, desc, count, dest_addr, context); } static inline int fi_enable(struct fid_ep *ep) { return ep->fid.ops->control(&ep->fid, FI_ENABLE, NULL); } static inline int fi_cancel(fid_t fid, void *context) { struct fid_ep *ep = container_of(fid, struct fid_ep, fid); return ep->ops->cancel(fid, context); } static inline int fi_endpoint(struct fid_domain *domain, struct fi_info *info, struct fid_ep **ep, void *context) { return domain->ops->endpoint(domain, info, ep, context); } static inline int fi_scalable_ep(struct fid_domain *domain, struct fi_info *info, struct fid_ep **sep, void *context) { return domain->ops->scalable_ep(domain, info, sep, context); } static inline int fi_setopt(fid_t fid, int level, int optname, const void *optval, size_t optlen) { struct fid_ep *ep = container_of(fid, struct fid_ep, fid); return ep->ops->setopt(fid, level, optname, optval, optlen); } static inline int fi_getopt(fid_t fid, int level, int optname, void *optval, size_t *optlen) { struct fid_ep *ep = container_of(fid, struct fid_ep, fid); return ep->ops->getopt(fid, level, optname, optval, optlen); } static inline int fi_tx_context(struct fid_ep *ep, int index, struct fi_tx_attr *attr, struct fid_ep **tx_ep, void *context) { return ep->ops->tx_ctx(ep, index, attr, tx_ep, context); } static inline int fi_rx_context(struct fid_ep *ep, int index, struct fi_rx_attr *attr, struct fid_ep **rx_ep, void *context) { return ep->ops->rx_ctx(ep, index, attr, rx_ep, context); } static inline int fi_ep_bind(struct fid_ep *ep, struct fid *bfid, uint64_t flags) { return ep->fid.ops->bind(&ep->fid, bfid, flags); } static inline int fi_scalable_ep_bind(struct fid_ep *sep, struct fid *bfid, uint64_t flags) { return sep->fid.ops->bind(&sep->fid, bfid, flags); } static inline int fi_stx_context(struct fid_domain *domain, struct fi_tx_attr *attr, struct fid_stx **stx, void *context) { return domain->ops->stx_ctx(domain, attr, stx, context); } static inline int fi_ep_alias(struct fid_ep *ep, struct fid_ep **alias_ep, uint64_t flags) { return -FI_ENOSYS; } static inline int fi_passive_ep(struct fid_fabric *fabric, struct fi_info *info, struct fid_pep **pep, void *context) { return fabric->ops->passive_ep(fabric, info, pep, context); } static inline int fi_pep_bind(struct fid_pep *pep, struct fid *bfid, uint64_t flags) { return pep->fid.ops->bind(&pep->fid, bfid, flags); } #endif #endif /* _FI_BGQ_DIRECT_EP_H_ */