/* * Copyright (c) 2015 Sandia National Laboratories. All rights reserved. * Copyright (c) 2015 Bull SAS. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2017 IBM Corporation. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "ompi_config.h" #include "opal/util/bit_ops.h" #include "ompi/constants.h" #include "ompi/datatype/ompi_datatype.h" #include "ompi/datatype/ompi_datatype_internal.h" #include "ompi/mca/coll/base/base.h" #include "ompi/mca/coll/coll.h" #include "ompi/mca/pml/pml.h" #include "ompi/op/op.h" #include "coll_portals4.h" #include "coll_portals4_request.h" #include #define COLL_PORTALS4_REDUCE_MAX_CHILDREN 2 static int reduce_kary_tree_top(const void *sendbuf, void *recvbuf, int count, MPI_Datatype dtype, MPI_Op op, int root, struct ompi_communicator_t *comm, ompi_coll_portals4_request_t *request, mca_coll_portals4_module_t *module) { bool is_sync = request->is_sync; int ret; unsigned int i; int size = ompi_comm_size(comm); int rank = ompi_comm_rank(comm); ptl_rank_t parent, child[COLL_PORTALS4_REDUCE_MAX_CHILDREN]; size_t internal_count, length; ptl_handle_md_t zero_md_h, data_md_h; ptl_handle_me_t me_h; ptl_me_t me; ptl_match_bits_t match_bits_ack, match_bits_rtr, match_bits; ptl_ct_event_t ct; ptl_op_t ptl_op; ptl_datatype_t ptl_dtype; request->type = OMPI_COLL_PORTALS4_TYPE_REDUCE; /* ** Initialization */ for (i = 0 ; i < COLL_PORTALS4_REDUCE_MAX_CHILDREN ; i++) { child[i] = PTL_INVALID_RANK; } parent = PTL_INVALID_RANK; zero_md_h = mca_coll_portals4_component.zero_md_h; data_md_h = mca_coll_portals4_component.data_md_h; internal_count = opal_atomic_add_fetch_size_t(&module->coll_count, 1); /* ** DATATYPE and SIZES */ ret = ompi_datatype_type_size(dtype, &length); length *= count; request->u.reduce.is_optim = is_reduce_optimizable(dtype, length, op, &ptl_dtype, &ptl_op); if (request->u.reduce.is_optim) { /* * TOPOLOGY */ /* this function is dependent on the number of segments, * if we use segmentation pipe-line is preferred, and * binary tree otherwise */ get_k_ary_tree(COLL_PORTALS4_REDUCE_MAX_CHILDREN, rank, size, root, &parent, child, &request->u.reduce.child_nb); /* * PORTALS4 RESOURCE ALLOCATION */ /* Compute match bits */ COLL_PORTALS4_SET_BITS(match_bits_ack, ompi_comm_get_local_cid(comm), 1, 0, COLL_PORTALS4_REDUCE, 0, internal_count); COLL_PORTALS4_SET_BITS(match_bits_rtr, ompi_comm_get_local_cid(comm), 0, 1, COLL_PORTALS4_REDUCE, 0, internal_count); COLL_PORTALS4_SET_BITS(match_bits, ompi_comm_get_local_cid(comm), 0, 0, COLL_PORTALS4_REDUCE, 0, internal_count); if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.reduce.trig_ct_h)) != 0) { return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret); } /* warning : all the operations will be executed on the recvbuf */ if (rank != root) { request->u.reduce.free_buffer = malloc(length); if (NULL == request->u.reduce.free_buffer) { return OMPI_ERR_OUT_OF_RESOURCE; } recvbuf = (void*)request->u.reduce.free_buffer; memcpy(recvbuf, sendbuf, length); } else { request->u.reduce.free_buffer = NULL; if (sendbuf != MPI_IN_PLACE) { memcpy(recvbuf, sendbuf, length); } } if (request->u.reduce.child_nb) { /* ** Prepare Data ME */ memset(&me, 0, sizeof(ptl_me_t)); me.start = recvbuf; me.length = length; me.ct_handle = request->u.reduce.trig_ct_h; me.uid = mca_coll_portals4_component.uid; me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE | PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE | PTL_ME_EVENT_CT_COMM; me.match_id.phys.nid = PTL_NID_ANY; me.match_id.phys.pid = PTL_PID_ANY; me.match_bits = match_bits; me.ignore_bits = 0; if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h, mca_coll_portals4_component.pt_idx, &me, PTL_PRIORITY_LIST, NULL, &request->u.reduce.data_me_h)) != 0) { return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret); } } if (rank != root) { request->u.reduce.use_ack_ct_h = true; if ((ret = PtlCTAlloc(mca_coll_portals4_component.ni_h, &request->u.reduce.ack_ct_h)) != 0) { return opal_stderr("PtlCTAlloc failed", __FILE__, __LINE__, ret); } /* ** Prepare ME for data ACK Put ** Priority List */ memset(&me, 0, sizeof(ptl_me_t)); me.start = NULL; me.length = 0; me.min_free = 0; me.uid = mca_coll_portals4_component.uid; me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE | PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE | PTL_ME_USE_ONCE | PTL_ME_EVENT_CT_COMM; me.match_id.phys.nid = PTL_NID_ANY; me.match_id.phys.pid = PTL_PID_ANY; me.match_bits = match_bits_ack; me.ignore_bits = 0; me.ct_handle = request->u.reduce.ack_ct_h; if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h, mca_coll_portals4_component.pt_idx, &me, PTL_PRIORITY_LIST, NULL, &me_h)) != 0) { return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret); } /* ** Prepare ME for sending RTR Put ** Priority List, match also with "Overflow list Me" in coll_portals4_component */ memset(&me, 0, sizeof(ptl_me_t)); me.start = NULL; me.length = 0; me.min_free = 0; me.uid = mca_coll_portals4_component.uid; me.options = PTL_ME_OP_PUT | PTL_ME_EVENT_SUCCESS_DISABLE | PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE | PTL_ME_USE_ONCE | PTL_ME_EVENT_CT_COMM | PTL_ME_EVENT_CT_OVERFLOW; me.match_id.phys.nid = PTL_NID_ANY; me.match_id.phys.pid = PTL_PID_ANY; me.match_bits = match_bits_rtr; me.ignore_bits = 0; me.ct_handle = request->u.reduce.trig_ct_h; if ((ret = PtlMEAppend(mca_coll_portals4_component.ni_h, mca_coll_portals4_component.pt_idx, &me, PTL_PRIORITY_LIST, NULL, &me_h)) != 0) { return opal_stderr("PtlMEAppend failed", __FILE__, __LINE__, ret); } /* Send Atomic operation to the parent */ if ((ret = PtlTriggeredAtomic(data_md_h, (uint64_t)recvbuf, length, PTL_NO_ACK_REQ, ompi_coll_portals4_get_peer(comm, parent), mca_coll_portals4_component.pt_idx, match_bits, 0, NULL, 0, ptl_op, ptl_dtype, request->u.reduce.trig_ct_h, request->u.reduce.child_nb + 1)) != 0) { return opal_stderr("PtlTriggeredAtomic failed", __FILE__, __LINE__, ret); } } else { request->u.reduce.use_ack_ct_h = false; } if (request->u.reduce.child_nb) { for (i = 0 ; i < COLL_PORTALS4_REDUCE_MAX_CHILDREN ; i++) { if (child[i] != PTL_INVALID_RANK) { /* * Prepare Triggered Put to ACK Data to children * */ if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ, ompi_coll_portals4_get_peer(comm, child[i]), mca_coll_portals4_component.pt_idx, match_bits_ack, 0, NULL, 0, request->u.reduce.trig_ct_h, (rank != root) ? request->u.reduce.child_nb + 1 : request->u.reduce.child_nb)) != 0) { return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret); } /* * Send RTR to children * */ /* and there, we only send the RTR when all the MEs are ready */ if ((ret = PtlPut(zero_md_h, 0, 0, PTL_NO_ACK_REQ, ompi_coll_portals4_get_peer(comm, child[i]), mca_coll_portals4_component.pt_idx, match_bits_rtr, 0, NULL, 0)) != PTL_OK) { return opal_stderr("Put RTR failed %d", __FILE__, __LINE__, ret); } } } } if (rank != root) { if (is_sync) { if ((ret = PtlCTWait(request->u.reduce.ack_ct_h, 1, &ct)) != 0) { opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret); } } else { if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ, ompi_coll_portals4_get_peer(comm, rank), mca_coll_portals4_component.finish_pt_idx, 0, 0, NULL, (uintptr_t) request, request->u.reduce.ack_ct_h, 1)) != 0) { return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret); } } } else { if (is_sync) { if ((ret = PtlCTWait(request->u.reduce.trig_ct_h, request->u.reduce.child_nb, &ct)) != 0) { opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret); } } else { if ((ret = PtlTriggeredPut (zero_md_h, 0, 0, PTL_NO_ACK_REQ, ompi_coll_portals4_get_peer(comm, rank), mca_coll_portals4_component.finish_pt_idx, 0, 0, NULL, (uintptr_t) request, request->u.reduce.trig_ct_h, request->u.reduce.child_nb)) != 0) { return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret); } } } } else { opal_output_verbose(100, ompi_coll_base_framework.framework_output, "rank %d - optimization not supported, falling back to previous handler\n", rank); if (request->is_sync) { if ((module->previous_reduce) && (module->previous_reduce_module)) { ret = module->previous_reduce(sendbuf, recvbuf, count, dtype, op, root, comm, module->previous_reduce_module); } else { opal_output_verbose(1, ompi_coll_base_framework.framework_output, "rank %d - no previous reduce handler is available, aborting\n", rank); return (OMPI_ERROR); } } else { if ((module->previous_ireduce) && (module->previous_ireduce_module)) { ret = module->previous_ireduce(sendbuf, recvbuf, count, dtype, op, root, comm, request->fallback_request, module->previous_ireduce_module); } else { opal_output_verbose(1, ompi_coll_base_framework.framework_output, "rank %d - no previous ireduce handler is available, aborting\n", rank); return (OMPI_ERROR); } } return ret; } return (OMPI_SUCCESS); } static int reduce_kary_tree_bottom(ompi_coll_portals4_request_t *request) { int ret, line; if (request->u.reduce.is_optim) { PtlAtomicSync(); if (request->u.reduce.use_ack_ct_h) { ret = PtlCTFree(request->u.reduce.ack_ct_h); if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; } } if (request->u.reduce.child_nb) { do { ret = PtlMEUnlink(request->u.reduce.data_me_h); } while (PTL_IN_USE == ret); if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; } } ret = PtlCTFree(request->u.reduce.trig_ct_h); if (PTL_OK != ret) { ret = OMPI_ERROR; line = __LINE__; goto err_hdlr; } if (request->u.reduce.free_buffer) { free(request->u.reduce.free_buffer); } } return (OMPI_SUCCESS); err_hdlr: opal_output(ompi_coll_base_framework.framework_output, "%s:%4d:%4d\tError occurred ret=%d", __FILE__, __LINE__, line, ret); return ret; } int ompi_coll_portals4_reduce_intra(const void *sendbuf, void *recvbuf, int count, MPI_Datatype dtype, MPI_Op op, int root, struct ompi_communicator_t *comm, mca_coll_base_module_t *module) { int ret; mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module; ompi_coll_portals4_request_t *request; OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request); if (NULL == request) { opal_output_verbose(1, ompi_coll_base_framework.framework_output, "%s:%d: request alloc failed\n", __FILE__, __LINE__); return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } request->is_sync = true; request->fallback_request = NULL; ret = reduce_kary_tree_top(sendbuf, recvbuf, count, dtype, op, root, comm, request, portals4_module); if (OMPI_SUCCESS != ret) return ret; ret = reduce_kary_tree_bottom(request); if (OMPI_SUCCESS != ret) return ret; OMPI_COLL_PORTALS4_REQUEST_RETURN(request); return (OMPI_SUCCESS); } int ompi_coll_portals4_ireduce_intra(const void* sendbuf, void* recvbuf, int count, MPI_Datatype dtype, MPI_Op op, int root, struct ompi_communicator_t *comm, ompi_request_t ** ompi_request, mca_coll_base_module_t *module) { int ret; mca_coll_portals4_module_t *portals4_module = (mca_coll_portals4_module_t*) module; ompi_coll_portals4_request_t *request; OMPI_COLL_PORTALS4_REQUEST_ALLOC(comm, request); if (NULL == request) { opal_output_verbose(1, ompi_coll_base_framework.framework_output, "%s:%d: request alloc failed\n", __FILE__, __LINE__); return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } *ompi_request = &request->super; request->fallback_request = ompi_request; request->is_sync = false; ret = reduce_kary_tree_top(sendbuf, recvbuf, count, dtype, op, root, comm, request, portals4_module); if (OMPI_SUCCESS != ret) return ret; if (!request->u.reduce.is_optim) { OMPI_COLL_PORTALS4_REQUEST_RETURN(request); } opal_output_verbose(10, ompi_coll_base_framework.framework_output, "ireduce"); return (OMPI_SUCCESS); } int ompi_coll_portals4_ireduce_intra_fini(ompi_coll_portals4_request_t *request) { int ret; ret = reduce_kary_tree_bottom(request); if (OMPI_SUCCESS != ret) return ret; ompi_request_complete(&request->super, true); return (OMPI_SUCCESS); }