/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2014-2020 Intel, Inc. All rights reserved. * Copyright (c) 2014-2019 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014 Artem Y. Polyakov . * All rights reserved. * Copyright (c) 2016 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2016 IBM Corporation. All rights reserved. * Copyright (c) 2021-2022 Nanook Consulting. All rights reserved. * Copyright (c) 2022 Triad National Security, LLC. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "src/include/pmix_config.h" #include "src/include/pmix_stdint.h" #include "include/pmix.h" #include "src/include/pmix_globals.h" #ifdef HAVE_STRING_H # include #endif #include #ifdef HAVE_UNISTD_H # include #endif #ifdef HAVE_SYS_SOCKET_H # include #endif #ifdef HAVE_SYS_UN_H # include #endif #ifdef HAVE_SYS_UIO_H # include #endif #ifdef HAVE_SYS_TYPES_H # include #endif #include #include "src/class/pmix_list.h" #include "src/mca/bfrops/bfrops.h" #include "src/mca/ptl/ptl.h" #include "src/util/pmix_argv.h" #include "src/util/pmix_error.h" #include "src/util/pmix_output.h" #include "pmix_client_ops.h" static pmix_status_t unpack_return(pmix_buffer_t *data); static pmix_status_t pack_fence(pmix_buffer_t *msg, pmix_cmd_t cmd, const pmix_proc_t *procs, size_t nprocs, const pmix_info_t *info, size_t ninfo); static void wait_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr, pmix_buffer_t *buf, void *cbdata); static void op_cbfunc(pmix_status_t status, void *cbdata); PMIX_EXPORT pmix_status_t PMIx_Fence(const pmix_proc_t procs[], size_t nprocs, const pmix_info_t info[], size_t ninfo) { pmix_cb_t *cb; pmix_status_t rc; PMIX_ACQUIRE_THREAD(&pmix_global_lock); pmix_output_verbose(2, pmix_client_globals.fence_output, "pmix: executing fence"); if (pmix_globals.init_cntr <= 0) { PMIX_RELEASE_THREAD(&pmix_global_lock); return PMIX_ERR_INIT; } /* if we are a singleton, there is nothing to do */ if (pmix_client_globals.singleton) { PMIX_RELEASE_THREAD(&pmix_global_lock); return PMIX_SUCCESS; } /* if we aren't connected, don't attempt to send */ if (!pmix_globals.connected) { PMIX_RELEASE_THREAD(&pmix_global_lock); return PMIX_ERR_UNREACH; } PMIX_RELEASE_THREAD(&pmix_global_lock); /* create a callback object as we need to pass it to the * recv routine so we know which callback to use when * the return message is recvd */ cb = PMIX_NEW(pmix_cb_t); /* push the message into our event base to send to the server */ if (PMIX_SUCCESS != (rc = PMIx_Fence_nb(procs, nprocs, info, ninfo, op_cbfunc, cb))) { PMIX_ERROR_LOG(rc); PMIX_RELEASE(cb); return rc; } /* wait for the fence to complete */ PMIX_WAIT_THREAD(&cb->lock); rc = cb->status; PMIX_RELEASE(cb); pmix_output_verbose(2, pmix_client_globals.fence_output, "pmix: fence released"); return rc; } PMIX_EXPORT pmix_status_t PMIx_Fence_nb(const pmix_proc_t procs[], size_t nprocs, const pmix_info_t info[], size_t ninfo, pmix_op_cbfunc_t cbfunc, void *cbdata) { pmix_buffer_t *msg; pmix_cmd_t cmd = PMIX_FENCENB_CMD; pmix_status_t rc; pmix_cb_t *cb; pmix_proc_t rg, *rgs; size_t nrg; PMIX_ACQUIRE_THREAD(&pmix_global_lock); pmix_output_verbose(2, pmix_client_globals.fence_output, "pmix: fence_nb called"); if (pmix_globals.init_cntr <= 0) { PMIX_RELEASE_THREAD(&pmix_global_lock); return PMIX_ERR_INIT; } /* if we aren't connected, don't attempt to send */ if (!pmix_globals.connected) { PMIX_RELEASE_THREAD(&pmix_global_lock); return PMIX_ERR_UNREACH; } PMIX_RELEASE_THREAD(&pmix_global_lock); /* check for bozo input */ if (NULL == procs && 0 != nprocs) { return PMIX_ERR_BAD_PARAM; } /* if we are given a NULL proc, then the caller is referencing * all procs within our own nspace */ if (NULL == procs) { pmix_strncpy(rg.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN); rg.rank = PMIX_RANK_WILDCARD; rgs = &rg; nrg = 1; } else { rgs = (pmix_proc_t *) procs; nrg = nprocs; } msg = PMIX_NEW(pmix_buffer_t); if (PMIX_SUCCESS != (rc = pack_fence(msg, cmd, rgs, nrg, info, ninfo))) { PMIX_RELEASE(msg); return rc; } /* create a callback object as we need to pass it to the * recv routine so we know which callback to use when * the return message is recvd */ cb = PMIX_NEW(pmix_cb_t); cb->cbfunc.opfn = cbfunc; cb->cbdata = cbdata; /* push the message into our event base to send to the server */ PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver, msg, wait_cbfunc, (void *) cb); if (PMIX_SUCCESS != rc) { PMIX_RELEASE(msg); PMIX_RELEASE(cb); } return rc; } static pmix_status_t unpack_return(pmix_buffer_t *data) { pmix_status_t rc; pmix_status_t ret; int32_t cnt; pmix_output_verbose(2, pmix_client_globals.fence_output, "client:unpack fence called"); /* unpack the status code */ cnt = 1; PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver, data, &ret, &cnt, PMIX_STATUS); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); return rc; } pmix_output_verbose(2, pmix_client_globals.fence_output, "client:unpack fence received status %d", ret); /* provide an opportunity to store any data (or at least how to access * any data) that was included in the fence */ PMIX_GDS_RECV_MODEX_COMPLETE(rc, pmix_client_globals.myserver, data); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); return rc; } return ret; } static pmix_status_t pack_fence(pmix_buffer_t *msg, pmix_cmd_t cmd, const pmix_proc_t *procs, size_t nprocs, const pmix_info_t *info, size_t ninfo) { pmix_status_t rc; /* pack the cmd */ PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &cmd, 1, PMIX_COMMAND); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); return rc; } /* pack the number of procs */ PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &nprocs, 1, PMIX_SIZE); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); return rc; } /* pack any provided procs - must always be at least one (our own) */ PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, procs, nprocs, PMIX_PROC); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); return rc; } /* pack the number of info */ PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, &ninfo, 1, PMIX_SIZE); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); return rc; } /* pack any provided info - may be NULL */ if (NULL != info && 0 < ninfo) { PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, info, ninfo, PMIX_INFO); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); return rc; } } return PMIX_SUCCESS; } static void wait_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr, pmix_buffer_t *buf, void *cbdata) { pmix_cb_t *cb = (pmix_cb_t *) cbdata; pmix_status_t rc; PMIX_HIDE_UNUSED_PARAMS(pr, hdr); pmix_output_verbose(2, pmix_client_globals.fence_output, "pmix: fence_nb callback recvd"); if (NULL == cb) { PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); return; } /* a zero-byte buffer indicates that this recv is being * completed due to a lost connection */ if (PMIX_BUFFER_IS_EMPTY(buf)) { rc = PMIX_ERR_UNREACH; } else { rc = unpack_return(buf); } /* if a callback was provided, execute it */ if (NULL != cb->cbfunc.opfn) { cb->cbfunc.opfn(rc, cb->cbdata); } PMIX_RELEASE(cb); } static void op_cbfunc(pmix_status_t status, void *cbdata) { pmix_cb_t *cb = (pmix_cb_t *) cbdata; cb->status = status; PMIX_WAKEUP_THREAD(&cb->lock); }