/* * Copyright (c) 2019 Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the * BSD license below: * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * - Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * - Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #include "config.h" #include #include #include #include #include #include #include #include #include #if HAVE_GETIFADDRS #include #include #endif #include #include #include #include #include #include #include int ofi_av_set_union(struct fid_av_set *dst, const struct fid_av_set *src) { struct util_av_set *src_av_set; struct util_av_set *dst_av_set; size_t temp_count; int i,j; src_av_set = container_of(src, struct util_av_set, av_set_fid); dst_av_set = container_of(dst, struct util_av_set, av_set_fid); assert(src_av_set->av == dst_av_set->av); temp_count = dst_av_set->fi_addr_count; for (i = 0; i < src_av_set->fi_addr_count; i++) { for (j = 0; j < dst_av_set->fi_addr_count; j++) { if (dst_av_set->fi_addr_array[j] == src_av_set->fi_addr_array[i]) break; } if (j == dst_av_set->fi_addr_count) { dst_av_set->fi_addr_array[temp_count++] = src_av_set->fi_addr_array[i]; } } dst_av_set->fi_addr_count = temp_count; return FI_SUCCESS; } int ofi_av_set_intersect(struct fid_av_set *dst, const struct fid_av_set *src) { struct util_av_set *src_av_set; struct util_av_set *dst_av_set; int i,j, temp; src_av_set = container_of(src, struct util_av_set, av_set_fid); dst_av_set = container_of(dst, struct util_av_set, av_set_fid); assert(src_av_set->av == dst_av_set->av); temp = 0; for (i = 0; i < src_av_set->fi_addr_count; i++) { for (j = temp; j < dst_av_set->fi_addr_count; j++) { if (dst_av_set->fi_addr_array[j] == src_av_set->fi_addr_array[i]) { dst_av_set->fi_addr_array[temp++] = dst_av_set->fi_addr_array[j]; break; } } } dst_av_set->fi_addr_count = temp; return FI_SUCCESS; } int ofi_av_set_diff(struct fid_av_set *dst, const struct fid_av_set *src) { struct util_av_set *src_av_set; struct util_av_set *dst_av_set; int i,j, temp; src_av_set = container_of(src, struct util_av_set, av_set_fid); dst_av_set = container_of(dst, struct util_av_set, av_set_fid); assert(src_av_set->av == dst_av_set->av); temp = dst_av_set->fi_addr_count; for (i = 0; i < src_av_set->fi_addr_count; i++) { for (j = 0; j < temp; j++) { if (dst_av_set->fi_addr_array[j] == src_av_set->fi_addr_array[i]) { dst_av_set->fi_addr_array[--temp] = dst_av_set->fi_addr_array[j]; break; } } } dst_av_set->fi_addr_count = temp; return FI_SUCCESS; } int ofi_av_set_insert(struct fid_av_set *set, fi_addr_t addr) { struct util_av_set *av_set; int i; av_set = container_of(set, struct util_av_set, av_set_fid); for (i = 0; i < av_set->fi_addr_count; i++) { if (av_set->fi_addr_array[i] == addr) return -FI_EINVAL; } av_set->fi_addr_array[av_set->fi_addr_count++] = addr; return FI_SUCCESS; } int ofi_av_set_remove(struct fid_av_set *set, fi_addr_t addr) { struct util_av_set *av_set; int i; av_set = container_of(set, struct util_av_set, av_set_fid); for (i = 0; i < av_set->fi_addr_count; i++) { if (av_set->fi_addr_array[i] == addr) { av_set->fi_addr_array[i] = av_set->fi_addr_array[--av_set->fi_addr_count]; return FI_SUCCESS; } } return -FI_EINVAL; } static inline uint64_t util_coll_form_tag(uint32_t coll_id, uint32_t rank) { uint64_t tag; uint64_t src_rank = rank; tag = coll_id; tag |= (src_rank << 32); return OFI_COLL_TAG_FLAG | tag; } static inline uint32_t util_coll_get_next_id(struct util_coll_mc *coll_mc) { uint32_t cid = coll_mc->group_id; return cid << 16 | coll_mc->seq++; } static inline int util_coll_op_create(struct util_coll_operation **coll_op, struct util_coll_mc *coll_mc, enum util_coll_op_type type, void *context, util_coll_comp_fn_t comp_fn) { *coll_op = calloc(1, sizeof(**coll_op)); if (!(*coll_op)) return -FI_ENOMEM; (*coll_op)->cid = util_coll_get_next_id(coll_mc); (*coll_op)->mc = coll_mc; (*coll_op)->type = type; (*coll_op)->context = context; (*coll_op)->comp_fn = comp_fn; dlist_init(&(*coll_op)->work_queue); return FI_SUCCESS; } static inline void util_coll_op_log_work(struct util_coll_operation *coll_op) { #if ENABLE_DEBUG struct util_coll_work_item *cur_item = NULL; struct util_coll_xfer_item *xfer_item; struct dlist_entry *tmp = NULL; size_t count = 0; FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "Remaining Work for %s:\n", log_util_coll_op_type[coll_op->type]); dlist_foreach_container_safe(&coll_op->work_queue, struct util_coll_work_item, cur_item, waiting_entry, tmp) { switch (cur_item->type) { case UTIL_COLL_SEND: xfer_item = container_of(cur_item, struct util_coll_xfer_item, hdr); FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "\t%ld: { %p [%s] SEND TO: 0x%02x FROM: 0x%02lx " "cnt: %d typesize: %ld tag: 0x%02lx }\n", count, cur_item, log_util_coll_state[cur_item->state], xfer_item->remote_rank, coll_op->mc->local_rank, xfer_item->count, ofi_datatype_size(xfer_item->datatype), xfer_item->tag); break; case UTIL_COLL_RECV: xfer_item = container_of(cur_item, struct util_coll_xfer_item, hdr); FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "\t%ld: { %p [%s] RECV FROM: 0x%02x TO: 0x%02lx " "cnt: %d typesize: %ld tag: 0x%02lx }\n", count, cur_item, log_util_coll_state[cur_item->state], xfer_item->remote_rank, coll_op->mc->local_rank, xfer_item->count, ofi_datatype_size(xfer_item->datatype), xfer_item->tag); break; case UTIL_COLL_REDUCE: //reduce_item = container_of(cur_item, struct util_coll_reduce_item, hdr); FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "\t%ld: { %p [%s] REDUCTION }\n", count, cur_item, log_util_coll_state[cur_item->state]); break; case UTIL_COLL_COPY: FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "\t%ld: { %p [%s] COPY }\n", count, cur_item, log_util_coll_state[cur_item->state]); break; case UTIL_COLL_COMP: FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "\t%ld: { %p [%s] COMPLETION }\n", count, cur_item, log_util_coll_state[cur_item->state]); break; default: FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "\t%ld: { %p [%s] UNKNOWN }\n", count, cur_item, log_util_coll_state[cur_item->state]); break; } count++; } #endif } static inline void util_coll_op_progress_work(struct util_ep *util_ep, struct util_coll_operation *coll_op) { struct util_coll_work_item *next_ready = NULL; struct util_coll_work_item *cur_item = NULL; struct util_coll_work_item *prev_item = NULL; struct dlist_entry *tmp = NULL; int previous_is_head; // clean up any completed items while searching for the next ready dlist_foreach_container_safe(&coll_op->work_queue, struct util_coll_work_item, cur_item, waiting_entry, tmp) { previous_is_head = cur_item->waiting_entry.prev == &cur_item->coll_op->work_queue; if (!previous_is_head) { prev_item = container_of(cur_item->waiting_entry.prev, struct util_coll_work_item, waiting_entry); } if (cur_item->state == UTIL_COLL_COMPLETE) { // if there is work before cur and cur is fencing, we can't complete if (cur_item->fence && !previous_is_head) continue; FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "Removing Completed Work item: %p \n", cur_item); dlist_remove(&cur_item->waiting_entry); free(cur_item); // if the work queue is empty, we're done if (dlist_empty(&coll_op->work_queue)) { free(coll_op); return; } continue; } // we can't progress if prior work is fencing if (!previous_is_head && prev_item && prev_item->fence) { FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "%p fenced by: %p \n", cur_item, prev_item); return; } // if the current item isn't waiting, it's not the next ready item if (cur_item->state != UTIL_COLL_WAITING) { FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "Work item not waiting: %p [%s]\n", cur_item, log_util_coll_state[cur_item->state]); continue; } FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "Ready item: %p \n", cur_item); next_ready = cur_item; break; } if (!next_ready) return; util_coll_op_log_work(coll_op); next_ready->state = UTIL_COLL_PROCESSING; slist_insert_tail(&next_ready->ready_entry, &util_ep->coll_ready_queue); } static inline void util_coll_op_bind_work(struct util_coll_operation *coll_op, struct util_coll_work_item *item) { item->coll_op = coll_op; dlist_insert_tail(&item->waiting_entry, &coll_op->work_queue); } static int util_coll_sched_send(struct util_coll_operation *coll_op, uint32_t dest, void *buf, int count, enum fi_datatype datatype, int fence) { struct util_coll_xfer_item *xfer_item; xfer_item = calloc(1, sizeof(*xfer_item)); if (!xfer_item) return -FI_ENOMEM; xfer_item->hdr.type = UTIL_COLL_SEND; xfer_item->hdr.state = UTIL_COLL_WAITING; xfer_item->hdr.fence = fence; xfer_item->tag = util_coll_form_tag(coll_op->cid, coll_op->mc->local_rank); xfer_item->buf = buf; xfer_item->count = count; xfer_item->datatype = datatype; xfer_item->remote_rank = dest; util_coll_op_bind_work(coll_op, &xfer_item->hdr); return FI_SUCCESS; } static int util_coll_sched_recv(struct util_coll_operation *coll_op, uint32_t src, void *buf, int count, enum fi_datatype datatype, int fence) { struct util_coll_xfer_item *xfer_item; xfer_item = calloc(1, sizeof(*xfer_item)); if (!xfer_item) return -FI_ENOMEM; xfer_item->hdr.type = UTIL_COLL_RECV; xfer_item->hdr.state = UTIL_COLL_WAITING; xfer_item->hdr.fence = fence; xfer_item->tag = util_coll_form_tag(coll_op->cid, src); xfer_item->buf = buf; xfer_item->count = count; xfer_item->datatype = datatype; xfer_item->remote_rank = src; util_coll_op_bind_work(coll_op, &xfer_item->hdr); return FI_SUCCESS; } static int util_coll_sched_reduce(struct util_coll_operation *coll_op, void *in_buf, void *inout_buf, int count, enum fi_datatype datatype, enum fi_op op, int fence) { struct util_coll_reduce_item *reduce_item; reduce_item = calloc(1, sizeof(*reduce_item)); if (!reduce_item) return -FI_ENOMEM; reduce_item->hdr.type = UTIL_COLL_REDUCE; reduce_item->hdr.state = UTIL_COLL_WAITING; reduce_item->hdr.fence = fence; reduce_item->in_buf = in_buf; reduce_item->inout_buf = inout_buf; reduce_item->count = count; reduce_item->datatype = datatype; reduce_item->op = op; util_coll_op_bind_work(coll_op, &reduce_item->hdr); return FI_SUCCESS; } static int util_coll_sched_copy(struct util_coll_operation *coll_op, void *in_buf, void *out_buf, int count, enum fi_datatype datatype, int fence) { struct util_coll_copy_item *copy_item; copy_item = calloc(1, sizeof(*copy_item)); if (!copy_item) return -FI_ENOMEM; copy_item->hdr.type = UTIL_COLL_COPY; copy_item->hdr.state = UTIL_COLL_WAITING; copy_item->hdr.fence = fence; copy_item->in_buf = in_buf; copy_item->out_buf = out_buf; copy_item->count = count; copy_item->datatype = datatype; util_coll_op_bind_work(coll_op, ©_item->hdr); return FI_SUCCESS; } static int util_coll_sched_comp(struct util_coll_operation *coll_op) { struct util_coll_work_item *comp_item; comp_item = calloc(1, sizeof(*comp_item)); if (!comp_item) return -FI_ENOMEM; comp_item->type = UTIL_COLL_COMP; comp_item->state = UTIL_COLL_WAITING; comp_item->fence = 1; util_coll_op_bind_work(coll_op, comp_item); return FI_SUCCESS; } /* TODO: when this fails, clean up the already scheduled work in this function */ static int util_coll_allreduce(struct util_coll_operation *coll_op, const void *send_buf, void *result, void* tmp_buf, int count, enum fi_datatype datatype, enum fi_op op) { uint64_t rem, pof2, my_new_id; uint64_t local, remote, next_remote; int ret; uint64_t mask = 1; pof2 = rounddown_power_of_two(coll_op->mc->av_set->fi_addr_count); rem = coll_op->mc->av_set->fi_addr_count - pof2; local = coll_op->mc->local_rank; // copy initial send data to result memcpy(result, send_buf, count * ofi_datatype_size(datatype)); if (local < 2 * rem) { if (local % 2 == 0) { ret = util_coll_sched_send(coll_op, local + 1, result, count, datatype, 1); if (ret) return ret; my_new_id = -1; } else { ret = util_coll_sched_recv(coll_op, local - 1, tmp_buf, count, datatype, 1); if (ret) return ret; my_new_id = local / 2; ret = util_coll_sched_reduce(coll_op, tmp_buf, result, count, datatype, op, 1); if (ret) return ret; } } else { my_new_id = local - rem; } if (my_new_id != -1) { while (mask < pof2) { next_remote = my_new_id ^ mask; remote = (next_remote < rem) ? next_remote * 2 + 1 : next_remote + rem; // receive remote data into tmp buf ret = util_coll_sched_recv(coll_op, remote, tmp_buf, count, datatype, 0); if (ret) return ret; // send result buf, which has the current total ret = util_coll_sched_send(coll_op, remote, result, count, datatype, 1); if (ret) return ret; if (remote < local) { // reduce received remote into result buf ret = util_coll_sched_reduce(coll_op, tmp_buf, result, count, datatype, op, 1); if (ret) return ret; } else { // reduce local result into received data ret = util_coll_sched_reduce(coll_op, result, tmp_buf, count, datatype, op, 1); if (ret) return ret; // copy total into result ret = util_coll_sched_copy(coll_op, tmp_buf, result, count, datatype, 1); if (ret) return ret; } mask <<= 1; } } if (local < 2 * rem) { if (local % 2) { ret = util_coll_sched_send(coll_op, local - 1, result, count, datatype, 1); if (ret) return ret; } else { ret = util_coll_sched_recv(coll_op, local + 1, result, count, datatype, 1); if (ret) return ret; } } return FI_SUCCESS; } static int util_coll_allgather(struct util_coll_operation *coll_op, const void *send_buf, void *result, int count, enum fi_datatype datatype) { // allgather implemented using ring algorithm int64_t ret, i, cur_offset, next_offset; size_t nbytes, numranks; uint64_t local_rank, left_rank, right_rank; local_rank = coll_op->mc->local_rank; nbytes = ofi_datatype_size(datatype) * count; numranks = coll_op->mc->av_set->fi_addr_count; // copy the local value to the appropriate place in result buffer ret = util_coll_sched_copy(coll_op, (void *) send_buf, (char *) result + (local_rank * nbytes), count, datatype, 1); if (ret) return ret; // send to right, recv from left left_rank = (numranks + local_rank - 1) % numranks; right_rank = (local_rank + 1) % numranks; cur_offset = local_rank; next_offset = left_rank; // fill in result with data going right to left for (i = 1; i < numranks; i++) { ret = util_coll_sched_send(coll_op, right_rank, (char *) result + (cur_offset * nbytes), count, datatype, 0); if (ret) return ret; ret = util_coll_sched_recv(coll_op, left_rank, (char *) result + (next_offset * nbytes), count, datatype, 1); if (ret) return ret; cur_offset = next_offset; next_offset = (numranks + next_offset - 1) % numranks; } return FI_SUCCESS; } static size_t util_binomial_tree_values_to_recv(uint64_t rank, size_t numranks) { size_t nvalues = 0x1 << (ofi_lsb(rank) - 1); if (numranks < rank + nvalues) nvalues = numranks - rank; return nvalues; } static int util_coll_scatter(struct util_coll_operation *coll_op, const void *data, void *result, void **temp, size_t count, uint64_t root, enum fi_datatype datatype) { // scatter implemented with binomial tree algorithm uint64_t local_rank, relative_rank; size_t nbytes, numranks, send_cnt, cur_cnt = 0; int ret, mask, remote_rank; void *send_data; local_rank = coll_op->mc->local_rank; numranks = coll_op->mc->av_set->fi_addr_count; relative_rank = (local_rank >= root) ? local_rank - root : local_rank - root + numranks; nbytes = count * ofi_datatype_size(datatype); // check if we need to participate if (count == 0) return FI_SUCCESS; // non-root even nodes get a temp buffer for receiving data // these nodes may need to send part of what they receive if (relative_rank && !(relative_rank % 2)) { cur_cnt = count * util_binomial_tree_values_to_recv(relative_rank, numranks); *temp = malloc(cur_cnt * ofi_datatype_size(datatype)); if (!*temp) return -FI_ENOMEM; } if (local_rank == root) { cur_cnt = count * numranks; if (root != 0) { // if we're root but not rank 0, we need to reorder the send buffer // according to destination rank. if we're rank 3, data intended for // ranks 0-2 will be moved to the end *temp = malloc(cur_cnt * ofi_datatype_size(datatype)); if (!temp) return -FI_ENOMEM; ret = util_coll_sched_copy(coll_op, (char *) data + nbytes * local_rank, *temp, (numranks - local_rank) * count, datatype, 1); if (ret) return ret; ret = util_coll_sched_copy(coll_op, (char *) data, (char *) *temp + (numranks - local_rank) * nbytes, local_rank * count, datatype, 1); if (ret) return ret; } } // set up all receives mask = 0x1; while (mask < numranks) { if (relative_rank & mask) { remote_rank = local_rank - mask; if (remote_rank < 0) remote_rank += numranks; if (relative_rank % 2) { // leaf node, we're receiving the actual data ret = util_coll_sched_recv(coll_op, remote_rank, result, count, datatype, 1); if (ret) return ret; } else { // branch node, we're receiving data which we've got to forward ret = util_coll_sched_recv(coll_op, remote_rank, *temp, cur_cnt, datatype, 1); if (ret) return ret; } break; } mask <<= 1; } // set up all sends send_data = root == local_rank && root == 0 ? (void *) data : *temp; mask >>= 1; while (mask > 0) { if (relative_rank + mask < numranks) { // to this point, cur_cnt has represented the number of values // to expect to store in our data buf // from here on, cur_cnt is the number of values we have left to // forward from the data buf send_cnt = cur_cnt - count * mask; remote_rank = local_rank + mask; if (remote_rank >= numranks) remote_rank -= numranks; FI_DBG(coll_op->mc->av_set->av->prov, FI_LOG_CQ, "MASK: 0x%0x CUR_CNT: %ld SENDING: %ld TO: %d\n", mask, cur_cnt, send_cnt, remote_rank); assert(send_cnt > 0); ret = util_coll_sched_send(coll_op, remote_rank, (char *) send_data + nbytes * mask, send_cnt, datatype, 1); if (ret) return ret; cur_cnt -= send_cnt; } mask >>= 1; } if (!(relative_rank % 2)) { // for the root and all even nodes, we've got to copy // our local data to the result buffer ret = util_coll_sched_copy(coll_op, send_data, result, count, datatype, 1); } return FI_SUCCESS; } static int util_coll_close(struct fid *fid) { struct util_coll_mc *coll_mc; coll_mc = container_of(fid, struct util_coll_mc, mc_fid.fid); ofi_atomic_dec32(&coll_mc->av_set->ref); free(coll_mc); return FI_SUCCESS; } static struct fi_ops util_coll_fi_ops = { .size = sizeof(struct fi_ops), .close = util_coll_close, .bind = fi_no_bind, .control = fi_no_control, .ops_open = fi_no_ops_open, }; static inline void util_coll_mc_init(struct util_coll_mc *coll_mc, struct util_av_set *av_set, struct fid_ep *ep, void *context) { coll_mc->mc_fid.fid.fclass = FI_CLASS_MC; coll_mc->mc_fid.fid.context = context; coll_mc->mc_fid.fid.ops = &util_coll_fi_ops; coll_mc->mc_fid.fi_addr = (uintptr_t) coll_mc; coll_mc->ep = ep; assert(av_set != NULL); ofi_atomic_inc32(&av_set->ref); coll_mc->av_set = av_set; } static int ofi_av_set_addr(struct fid_av_set *set, fi_addr_t *coll_addr) { struct util_av_set *av_set; av_set = container_of(set, struct util_av_set, av_set_fid); *coll_addr = (uintptr_t) &av_set->coll_mc; return FI_SUCCESS; } /* TODO: Figure out requirements for using collectives. * e.g. require local address to be in AV? * Determine best way to handle first join request */ static int util_coll_find_local_rank(struct fid_ep *ep, struct util_coll_mc *coll_mc) { size_t addrlen; char *addr; int ret; fi_addr_t my_addr; int i; addrlen = 0; ret = fi_getname(&ep->fid, NULL, &addrlen); if (ret != FI_SUCCESS && addrlen == 0) return ret; addr = calloc(1, addrlen); if (!addr) return -FI_ENOMEM; ret = fi_getname(&ep->fid, addr, &addrlen); if (ret) { free(addr); return ret; } my_addr = ofi_av_lookup_fi_addr(coll_mc->av_set->av, addr); coll_mc->local_rank = FI_ADDR_NOTAVAIL; if (my_addr != FI_ADDR_NOTAVAIL) { for (i=0; iav_set->fi_addr_count; i++) if (coll_mc->av_set->fi_addr_array[i] == my_addr) { coll_mc->local_rank = i; break; } } free(addr); return FI_SUCCESS; } void util_coll_join_comp(struct util_coll_operation *coll_op) { struct fi_eq_entry entry; struct util_ep *ep = container_of(coll_op->mc->ep, struct util_ep, ep_fid); coll_op->data.join.new_mc->seq = 0; coll_op->data.join.new_mc->group_id = ofi_bitmask_get_lsbset(coll_op->data.join.data); // mark the local mask bit ofi_bitmask_unset(ep->coll_cid_mask, coll_op->data.join.new_mc->group_id); /* write to the eq */ memset(&entry, 0, sizeof(entry)); entry.fid = &coll_op->mc->mc_fid.fid; entry.context = coll_op->context; if (ofi_eq_write(&ep->eq->eq_fid, FI_JOIN_COMPLETE, &entry, sizeof(struct fi_eq_entry), FI_COLLECTIVE) < 0) FI_WARN(ep->domain->fabric->prov, FI_LOG_DOMAIN, "join collective - eq write failed\n"); ofi_bitmask_free(&coll_op->data.join.data); ofi_bitmask_free(&coll_op->data.join.tmp); } void util_coll_collective_comp(struct util_coll_operation *coll_op) { struct util_ep *ep; ep = container_of(coll_op->mc->ep, struct util_ep, ep_fid); if (ofi_cq_write(ep->tx_cq, coll_op->context, FI_COLLECTIVE, 0, 0, 0, 0)) FI_WARN(ep->domain->fabric->prov, FI_LOG_DOMAIN, "barrier collective - cq write failed\n"); switch (coll_op->type) { case UTIL_COLL_ALLREDUCE_OP: free(coll_op->data.allreduce.data); break; case UTIL_COLL_SCATTER_OP: free(coll_op->data.scatter); break; case UTIL_COLL_BROADCAST_OP: free(coll_op->data.broadcast.chunk); free(coll_op->data.broadcast.scatter); break; case UTIL_COLL_JOIN_OP: case UTIL_COLL_BARRIER_OP: case UTIL_COLL_ALLGATHER_OP: default: //nothing to clean up break; } } static int util_coll_proc_reduce_item(struct util_coll_reduce_item *reduce_item) { if (FI_MIN <= reduce_item->op && FI_BXOR >= reduce_item->op) { ofi_atomic_write_handlers[reduce_item->op] [reduce_item->datatype]( reduce_item->inout_buf, reduce_item->in_buf, reduce_item->count); } else { return -FI_ENOSYS; } return FI_SUCCESS; } int util_coll_process_xfer_item(struct util_coll_xfer_item *item) { struct iovec iov; struct fi_msg_tagged msg; struct util_coll_mc *mc = item->hdr.coll_op->mc; int ret; msg.msg_iov = &iov; msg.desc = NULL; msg.iov_count = 1; msg.ignore = 0; msg.context = item; msg.data = 0; msg.tag = item->tag; msg.addr = mc->av_set->fi_addr_array[item->remote_rank]; iov.iov_base = item->buf; iov.iov_len = (item->count * ofi_datatype_size(item->datatype)); if (item->hdr.type == UTIL_COLL_SEND) { ret = fi_tsendmsg(mc->ep, &msg, FI_COLLECTIVE); if (!ret) FI_DBG(mc->av_set->av->prov, FI_LOG_CQ, "%p SEND [0x%02lx] -> [0x%02x] cnt: %d sz: %ld\n", item, item->hdr.coll_op->mc->local_rank, item->remote_rank, item->count, item->count * ofi_datatype_size(item->datatype)); return ret; } else if (item->hdr.type == UTIL_COLL_RECV) { ret = fi_trecvmsg(mc->ep, &msg, FI_COLLECTIVE); if (!ret) FI_DBG(mc->av_set->av->prov, FI_LOG_CQ, "%p RECV [0x%02lx] <- [0x%02x] cnt: %d sz: %ld\n", item, item->hdr.coll_op->mc->local_rank, item->remote_rank, item->count, item->count * ofi_datatype_size(item->datatype)); return ret; } return -FI_ENOSYS; } int ofi_coll_ep_progress(struct fid_ep *ep) { struct util_coll_work_item *work_item; struct util_coll_reduce_item *reduce_item; struct util_coll_copy_item *copy_item; struct util_coll_xfer_item *xfer_item; struct util_coll_operation *coll_op; struct util_ep *util_ep; int ret; util_ep = container_of(ep, struct util_ep, ep_fid); while (!slist_empty(&util_ep->coll_ready_queue)) { slist_remove_head_container(&util_ep->coll_ready_queue, struct util_coll_work_item, work_item, ready_entry); coll_op = work_item->coll_op; switch (work_item->type) { case UTIL_COLL_SEND: xfer_item = container_of(work_item, struct util_coll_xfer_item, hdr); ret = util_coll_process_xfer_item(xfer_item); if (ret && ret == -FI_EAGAIN) { slist_insert_tail(&work_item->ready_entry, &util_ep->coll_ready_queue); goto out; } break; case UTIL_COLL_RECV: xfer_item = container_of(work_item, struct util_coll_xfer_item, hdr); ret = util_coll_process_xfer_item(xfer_item); if (ret) goto out; break; case UTIL_COLL_REDUCE: reduce_item = container_of(work_item, struct util_coll_reduce_item, hdr); ret = util_coll_proc_reduce_item(reduce_item); if (ret) goto out; reduce_item->hdr.state = UTIL_COLL_COMPLETE; break; case UTIL_COLL_COPY: copy_item = container_of(work_item, struct util_coll_copy_item, hdr); memcpy(copy_item->out_buf, copy_item->in_buf, copy_item->count * ofi_datatype_size(copy_item->datatype)); copy_item->hdr.state = UTIL_COLL_COMPLETE; break; case UTIL_COLL_COMP: if (work_item->coll_op->comp_fn) work_item->coll_op->comp_fn(work_item->coll_op); work_item->state = UTIL_COLL_COMPLETE; break; default: ret = FI_ENOSYS; goto out; } util_coll_op_progress_work(util_ep, coll_op); } ret = FI_SUCCESS; out: return ret; } int ofi_join_collective(struct fid_ep *ep, fi_addr_t coll_addr, const struct fid_av_set *set, uint64_t flags, struct fid_mc **mc, void *context) { struct util_coll_mc *new_coll_mc; struct util_av_set *av_set; struct util_coll_mc *coll_mc; struct util_coll_operation *join_op; struct util_ep *util_ep; int ret; av_set = container_of(set, struct util_av_set, av_set_fid); if (coll_addr == FI_ADDR_NOTAVAIL) { assert(av_set->av->coll_mc != NULL); coll_mc = av_set->av->coll_mc; } else { coll_mc = (struct util_coll_mc*) ((uintptr_t) coll_addr); } new_coll_mc = calloc(1, sizeof(*new_coll_mc)); if (!new_coll_mc) return -FI_ENOMEM; // set up the new mc for future collectives util_coll_mc_init(new_coll_mc, av_set, ep, context); coll_mc->ep = ep; util_ep = container_of(ep, struct util_ep, ep_fid); /* get the rank */ util_coll_find_local_rank(ep, new_coll_mc); util_coll_find_local_rank(ep, coll_mc); ret = util_coll_op_create(&join_op, coll_mc, UTIL_COLL_JOIN_OP, context, util_coll_join_comp); if (ret) goto err1; join_op->data.join.new_mc = new_coll_mc; ret = ofi_bitmask_create(&join_op->data.join.data, OFI_MAX_GROUP_ID); if (ret) goto err2; ret = ofi_bitmask_create(&join_op->data.join.tmp, OFI_MAX_GROUP_ID); if (ret) goto err3; ret = util_coll_allreduce(join_op, util_ep->coll_cid_mask->bytes, join_op->data.join.data.bytes, join_op->data.join.tmp.bytes, ofi_bitmask_bytesize(util_ep->coll_cid_mask), FI_UINT8, FI_BAND); if (ret) goto err4; ret = util_coll_sched_comp(join_op); if (ret) goto err4; util_coll_op_progress_work(util_ep, join_op); *mc = &new_coll_mc->mc_fid; return FI_SUCCESS; err4: ofi_bitmask_free(&join_op->data.join.tmp); err3: ofi_bitmask_free(&join_op->data.join.data); err2: free(join_op); err1: free(new_coll_mc); return ret; } static struct fi_ops_av_set util_av_set_ops= { .set_union = ofi_av_set_union, .intersect = ofi_av_set_intersect, .diff = ofi_av_set_diff, .insert = ofi_av_set_insert, .remove = ofi_av_set_remove, .addr = ofi_av_set_addr }; static int util_coll_copy_from_av(struct util_av *av, void *addr, fi_addr_t fi_addr, void *arg) { struct util_av_set *av_set = (struct util_av_set *) arg; av_set->fi_addr_array[av_set->fi_addr_count++] = fi_addr; return FI_SUCCESS; } static int util_av_set_close(struct fid *fid) { struct util_av_set *av_set; av_set = container_of(fid, struct util_av_set, av_set_fid.fid); // release reference taken by internal coll_mc embedded in av_set ofi_atomic_dec32(&av_set->ref); if (ofi_atomic_get32(&av_set->ref) > 0) return -FI_EBUSY; free(av_set); return FI_SUCCESS; } static struct fi_ops util_av_set_fi_ops = { .size = sizeof(struct fi_ops), .close = util_av_set_close, .bind = fi_no_bind, .control = fi_no_control, .ops_open = fi_no_ops_open, }; static inline int util_av_set_init(struct util_av_set *av_set, struct util_av *util_av, void *context) { int ret = FI_SUCCESS; av_set->av_set_fid.ops = &util_av_set_ops; av_set->av_set_fid.fid.fclass = FI_CLASS_AV_SET; av_set->av_set_fid.fid.context = context; av_set->av_set_fid.fid.ops = &util_av_set_fi_ops; av_set->av = util_av; ofi_atomic_initialize32(&av_set->ref, 0); ret = fastlock_init(&av_set->lock); return ret; } static int util_coll_av_init(struct util_av *av) { struct util_coll_mc *coll_mc; int ret; assert(!av->coll_mc); coll_mc = calloc(1, sizeof(*coll_mc)); if (!coll_mc) return -FI_ENOMEM; coll_mc->av_set = calloc(1, sizeof(*coll_mc->av_set)); if (!coll_mc->av_set) { ret = -FI_ENOMEM; goto err1; } ret = util_av_set_init(coll_mc->av_set, av, NULL); if (ret) goto err3; coll_mc->av_set->fi_addr_array = calloc(av->count, sizeof(*coll_mc->av_set->fi_addr_array)); if (!coll_mc->av_set->fi_addr_array) { ret = -FI_ENOMEM; goto err2; } ret = ofi_av_elements_iter(av, util_coll_copy_from_av, (void *)coll_mc->av_set); if (ret) goto err4; util_coll_mc_init(coll_mc, coll_mc->av_set, NULL, NULL); av->coll_mc = coll_mc; return FI_SUCCESS; err4: fastlock_destroy(&coll_mc->av_set->lock); err3: free(coll_mc->av_set->fi_addr_array); err2: free(coll_mc->av_set); err1: free(coll_mc); return ret; } int ofi_av_set(struct fid_av *av, struct fi_av_set_attr *attr, struct fid_av_set **av_set_fid, void * context) { struct util_av *util_av = container_of(av, struct util_av, av_fid); struct util_av_set *av_set; int ret, iter; if (!util_av->coll_mc) { ret = util_coll_av_init(util_av); if (ret) return ret; } av_set = calloc(1,sizeof(*av_set)); if (!av_set) return -FI_ENOMEM; ret = util_av_set_init(av_set, util_av, context); if (ret) goto err1; av_set->fi_addr_array = calloc(util_av->count, sizeof(*av_set->fi_addr_array)); if (!av_set->fi_addr_array) goto err2; for (iter = 0; iter < attr->count; iter++) { av_set->fi_addr_array[iter] = util_av->coll_mc->av_set->fi_addr_array[iter * attr->stride]; av_set->fi_addr_count++; } util_coll_mc_init(&av_set->coll_mc, av_set, NULL, context); (*av_set_fid) = &av_set->av_set_fid; return FI_SUCCESS; err2: fastlock_destroy(&av_set->lock); err1: free(av_set); return ret; } ssize_t ofi_ep_barrier(struct fid_ep *ep, fi_addr_t coll_addr, void *context) { struct util_coll_mc *coll_mc; struct util_coll_operation *barrier_op; struct util_ep *util_ep; uint64_t send; int ret; coll_mc = (struct util_coll_mc*) ((uintptr_t) coll_addr); ret = util_coll_op_create(&barrier_op, coll_mc, UTIL_COLL_BARRIER_OP, context, util_coll_collective_comp); if (ret) return ret; send = ~barrier_op->mc->local_rank; ret = util_coll_allreduce(barrier_op, &send, &barrier_op->data.barrier.data, &barrier_op->data.barrier.tmp, 1, FI_UINT64, FI_BAND); if (ret) goto err1; ret = util_coll_sched_comp(barrier_op); if (ret) goto err1; util_ep = container_of(ep, struct util_ep, ep_fid); util_coll_op_progress_work(util_ep, barrier_op); return FI_SUCCESS; err1: free(barrier_op); return ret; } ssize_t ofi_ep_allreduce(struct fid_ep *ep, const void *buf, size_t count, void *desc, void *result, void *result_desc, fi_addr_t coll_addr, enum fi_datatype datatype, enum fi_op op, uint64_t flags, void *context) { struct util_coll_mc *coll_mc; struct util_coll_operation *allreduce_op; struct util_ep *util_ep; int ret; coll_mc = (struct util_coll_mc *) ((uintptr_t) coll_addr); ret = util_coll_op_create(&allreduce_op, coll_mc, UTIL_COLL_ALLREDUCE_OP, context, util_coll_collective_comp); if (ret) return ret; allreduce_op->data.allreduce.size = count * ofi_datatype_size(datatype); allreduce_op->data.allreduce.data = calloc(count, ofi_datatype_size(datatype)); if (!allreduce_op->data.allreduce.data) goto err1; ret = util_coll_allreduce(allreduce_op, buf, result, allreduce_op->data.allreduce.data, count, datatype, op); if (ret) goto err2; ret = util_coll_sched_comp(allreduce_op); if (ret) goto err2; util_ep = container_of(ep, struct util_ep, ep_fid); util_coll_op_progress_work(util_ep, allreduce_op); return FI_SUCCESS; err2: free(allreduce_op->data.allreduce.data); err1: free(allreduce_op); return ret; } ssize_t ofi_ep_allgather(struct fid_ep *ep, const void *buf, size_t count, void *desc, void *result, void *result_desc, fi_addr_t coll_addr, enum fi_datatype datatype, uint64_t flags, void *context) { struct util_coll_mc *coll_mc; struct util_coll_operation *allgather_op; struct util_ep *util_ep; int ret; coll_mc = (struct util_coll_mc *) ((uintptr_t) coll_addr); ret = util_coll_op_create(&allgather_op, coll_mc, UTIL_COLL_ALLGATHER_OP, context, util_coll_collective_comp); if (ret) return ret; ret = util_coll_allgather(allgather_op, buf, result, count, datatype); if (ret) goto err; ret = util_coll_sched_comp(allgather_op); if (ret) goto err; util_ep = container_of(ep, struct util_ep, ep_fid); util_coll_op_progress_work(util_ep, allgather_op); return FI_SUCCESS; err: free(allgather_op); return ret; } ssize_t ofi_ep_scatter(struct fid_ep *ep, const void *buf, size_t count, void *desc, void *result, void *result_desc, fi_addr_t coll_addr, fi_addr_t root_addr, enum fi_datatype datatype, uint64_t flags, void *context) { struct util_coll_mc *coll_mc; struct util_coll_operation *scatter_op; struct util_ep *util_ep; int ret; coll_mc = (struct util_coll_mc *) ((uintptr_t) coll_addr); ret = util_coll_op_create(&scatter_op, coll_mc, UTIL_COLL_SCATTER_OP, context, util_coll_collective_comp); if (ret) return ret; ret = util_coll_scatter(scatter_op, buf, result, &scatter_op->data.scatter, count, root_addr, datatype); if (ret) goto err; ret = util_coll_sched_comp(scatter_op); if (ret) goto err; util_ep = container_of(ep, struct util_ep, ep_fid); util_coll_op_progress_work(util_ep, scatter_op); return FI_SUCCESS; err: free(scatter_op); return ret; } ssize_t ofi_ep_broadcast(struct fid_ep *ep, void *buf, size_t count, void *desc, fi_addr_t coll_addr, fi_addr_t root_addr, enum fi_datatype datatype, uint64_t flags, void *context) { struct util_coll_mc *coll_mc; struct util_coll_operation *broadcast_op; struct util_ep *util_ep; int ret, chunk_cnt, numranks, local; coll_mc = (struct util_coll_mc *) ((uintptr_t) coll_addr); ret = util_coll_op_create(&broadcast_op, coll_mc, UTIL_COLL_BROADCAST_OP, context, util_coll_collective_comp); if (ret) return ret; local = broadcast_op->mc->local_rank; numranks = broadcast_op->mc->av_set->fi_addr_count; chunk_cnt = (count + numranks - 1) / numranks; if (chunk_cnt * local > count && chunk_cnt * local - (int) count > chunk_cnt) chunk_cnt = 0; broadcast_op->data.broadcast.chunk = malloc(chunk_cnt * ofi_datatype_size(datatype)); if (!broadcast_op->data.broadcast.chunk) { ret = -FI_ENOMEM; goto err1; } ret = util_coll_scatter(broadcast_op, buf, broadcast_op->data.broadcast.chunk, &broadcast_op->data.broadcast.scatter, chunk_cnt, root_addr, datatype); if (ret) goto err2; ret = util_coll_allgather(broadcast_op, broadcast_op->data.broadcast.chunk, buf, chunk_cnt, datatype); if (ret) goto err2; ret = util_coll_sched_comp(broadcast_op); if (ret) goto err2; util_ep = container_of(ep, struct util_ep, ep_fid); util_coll_op_progress_work(util_ep, broadcast_op); return FI_SUCCESS; err2: free(broadcast_op->data.broadcast.chunk); err1: free(broadcast_op); return ret; } void ofi_coll_handle_xfer_comp(uint64_t tag, void *ctx) { struct util_ep *util_ep; struct util_coll_xfer_item *xfer_item = (struct util_coll_xfer_item *) ctx; xfer_item->hdr.state = UTIL_COLL_COMPLETE; FI_DBG(xfer_item->hdr.coll_op->mc->av_set->av->prov, FI_LOG_CQ, "\tXfer complete: { %p %s Remote: 0x%02x Local: 0x%02lx cnt: %d typesize: %ld }\n", xfer_item, xfer_item->hdr.type == UTIL_COLL_SEND ? "SEND" : "RECV", xfer_item->remote_rank, xfer_item->hdr.coll_op->mc->local_rank, xfer_item->count, ofi_datatype_size(xfer_item->datatype)); util_ep = container_of(xfer_item->hdr.coll_op->mc->ep, struct util_ep, ep_fid); util_coll_op_progress_work(util_ep, xfer_item->hdr.coll_op); } int ofi_query_collective(struct fid_domain *domain, enum fi_collective_op coll, struct fi_collective_attr *attr, uint64_t flags) { int ret; if (!attr || attr->mode != 0) return -FI_EINVAL; switch (coll) { case FI_BARRIER: case FI_ALLGATHER: case FI_SCATTER: case FI_BROADCAST: ret = FI_SUCCESS; break; case FI_ALLREDUCE: if (FI_MIN <= attr->op && FI_BXOR >= attr->op) ret = fi_query_atomic(domain, attr->datatype, attr->op, &attr->datatype_attr, flags); else return -FI_ENOSYS; break; case FI_ALLTOALL: case FI_REDUCE_SCATTER: case FI_REDUCE: case FI_GATHER: default: return -FI_ENOSYS; } if (ret) return ret; // with the currently implemented software based collective operations // the only restriction is the number of ranks we can address, as limited // by the size of the rank portion of the collective tag, which is 31 bits. // future collectives may impose further restrictions which will need to update // the calculation. For example, operations which require dedicated space in // the recieve buffer for each rank would limit the number of members by buffer // size and value type (8kB buffer / 64B value = 128 member max). // hardware may impose further restrictions attr->max_members = ~(0x80000000); return FI_SUCCESS; }