/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2011 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2015-2020 Intel, Inc. All rights reserved. * Copyright (c) 2017 IBM Corporation. All rights reserved. * Copyright (c) 2017 Mellanox Technologies. All rights reserved. * Copyright (c) 2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2021-2022 Nanook Consulting. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ /** * @file * * I/O Forwarding Service */ #ifndef PMIX_IOF_H #define PMIX_IOF_H #include "src/include/pmix_config.h" #ifdef HAVE_SYS_TYPES_H # include #endif #ifdef HAVE_SYS_UIO_H # include #endif #ifdef HAVE_NET_UIO_H # include #endif #ifdef HAVE_UNISTD_H # include #endif #include #include "src/class/pmix_list.h" #include "src/include/pmix_globals.h" #include "src/util/pmix_fd.h" BEGIN_C_DECLS /* * Maximum size of single msg */ #define PMIX_IOF_BASE_MSG_MAX 8192 #define PMIX_IOF_BASE_TAG_MAX 1024 #define PMIX_IOF_MAX_INPUT_BUFFERS 50 #define PMIX_IOF_MAX_RETRIES 4 typedef struct { pmix_list_item_t super; bool pending; bool always_writable; int numtries; pmix_event_t *ev; struct timeval tv; int fd; pmix_list_t outputs; } pmix_iof_write_event_t; PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_event_t); #define PMIX_IOF_WRITE_EVENT_STATIC_INIT \ { \ .super = PMIX_LIST_ITEM_STATIC_INIT, \ .pending = false, \ .always_writable = false, \ .numtries = 0, \ .ev = NULL, \ .tv = {0, 0}, \ .fd = 0, \ .outputs = PMIX_LIST_STATIC_INIT \ } typedef struct { pmix_list_item_t super; pmix_proc_t name; pmix_iof_channel_t tag; pmix_iof_write_event_t wev; bool xoff; bool exclusive; bool closed; } pmix_iof_sink_t; PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_sink_t); #define PMIX_IOF_SINK_STATIC_INIT \ { \ .super = PMIX_LIST_ITEM_STATIC_INIT, \ .name = {{0}, 0}, \ .tag = PMIX_FWD_NO_CHANNELS, \ .wev = PMIX_IOF_WRITE_EVENT_STATIC_INIT, \ .xoff = false, \ .exclusive = false, \ .closed = false \ } typedef struct { pmix_list_item_t super; char *data; int numbytes; } pmix_iof_write_output_t; PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_write_output_t); typedef struct { pmix_object_t super; pmix_event_t ev; struct timeval tv; int fd; bool active; void *childproc; bool always_readable; pmix_proc_t name; pmix_iof_channel_t channel; pmix_proc_t *targets; size_t ntargets; pmix_info_t *directives; size_t ndirs; } pmix_iof_read_event_t; PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_read_event_t); typedef struct { pmix_list_item_t super; pmix_proc_t name; pmix_iof_write_event_t *channel; pmix_iof_flags_t flags; pmix_iof_channel_t stream; bool copystdout; bool copystderr; pmix_byte_object_t bo; } pmix_iof_residual_t; PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_iof_residual_t); /* Write event macro's */ static inline bool pmix_iof_fd_always_ready(int fd) { return pmix_fd_is_regular(fd) || (pmix_fd_is_chardev(fd) && !isatty(fd)) || pmix_fd_is_blkdev(fd); } #define PMIX_IOF_SINK_BLOCKSIZE (1024) #define PMIX_IOF_SINK_ACTIVATE(w) \ do { \ struct timeval *tv = NULL; \ (w)->pending = true; \ PMIX_POST_OBJECT((w)); \ if ((w)->always_writable) { \ /* Regular is always write ready. Use timer to activate */ \ tv = &(w)->tv; \ } \ if (pmix_event_add((w)->ev, tv)) { \ PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); \ } \ } while (0); /* define an output "sink", adding it to the provided * endpoint list for this proc */ #define PMIX_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \ do { \ pmix_output_verbose(1, pmix_client_globals.iof_output, \ "defining endpt: file %s line %d fd %d", __FILE__, __LINE__, (fid)); \ PMIX_CONSTRUCT((snk), pmix_iof_sink_t); \ pmix_strncpy((snk)->name.nspace, (nm)->nspace, PMIX_MAX_NSLEN); \ (snk)->name.rank = (nm)->rank; \ (snk)->tag = (tg); \ if (0 <= (fid)) { \ (snk)->wev.fd = (fid); \ (snk)->wev.always_writable = pmix_iof_fd_always_ready(fid); \ if ((snk)->wev.always_writable) { \ pmix_event_evtimer_set(pmix_globals.evbase, (snk)->wev.ev, wrthndlr, (snk)); \ } else { \ pmix_event_set(pmix_globals.evbase, (snk)->wev.ev, (snk)->wev.fd, PMIX_EV_WRITE, \ wrthndlr, (snk)); \ } \ } \ PMIX_POST_OBJECT(snk); \ } while (0); /* Read event macro's */ #define PMIX_IOF_READ_ADDEV(rev) \ do { \ struct timeval *tv = NULL; \ if ((rev)->always_readable) { \ tv = &(rev)->tv; \ } \ if (pmix_event_add(&(rev)->ev, tv)) { \ PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); \ } \ } while (0); #define PMIX_IOF_READ_ACTIVATE(rev) \ do { \ (rev)->active = true; \ PMIX_POST_OBJECT(rev); \ PMIX_IOF_READ_ADDEV(rev); \ } while (0); #define PMIX_IOF_READ_EVENT(rv, p, np, d, nd, fid, cbfunc, actv) \ do { \ size_t _ii; \ pmix_iof_read_event_t *rev; \ pmix_output_verbose(1, pmix_client_globals.iof_output, "defining read event at: %s %d", \ __FILE__, __LINE__); \ rev = PMIX_NEW(pmix_iof_read_event_t); \ if (NULL != (p)) { \ (rev)->ntargets = (np); \ PMIX_PROC_CREATE((rev)->targets, (rev)->ntargets); \ memcpy((rev)->targets, (p), (np) * sizeof(pmix_proc_t)); \ } \ if (NULL != (d) && 0 < (nd)) { \ PMIX_INFO_CREATE((rev)->directives, (nd)); \ (rev)->ndirs = (nd); \ for (_ii = 0; _ii < (size_t) nd; _ii++) { \ PMIX_INFO_XFER(&((rev)->directives[_ii]), &((d)[_ii])); \ } \ } \ rev->fd = (fid); \ rev->always_readable = pmix_iof_fd_always_ready(fid); \ *(rv) = rev; \ if (rev->always_readable) { \ pmix_event_evtimer_set(pmix_globals.evbase, &rev->ev, (cbfunc), rev); \ } else { \ pmix_event_set(pmix_globals.evbase, &rev->ev, (fid), PMIX_EV_READ, (cbfunc), rev); \ } \ if ((actv)) { \ PMIX_IOF_READ_ACTIVATE(rev) \ } \ } while (0); #define PMIX_IOF_READ_EVENT_LOCAL(rv, fid, cbfunc, actv) \ do { \ pmix_iof_read_event_t *rev; \ pmix_output_verbose(1, pmix_client_globals.iof_output, "defining read event at: %s %d", \ __FILE__, __LINE__); \ rev = PMIX_NEW(pmix_iof_read_event_t); \ rev->fd = (fid); \ rev->always_readable = pmix_iof_fd_always_ready(fid); \ *(rv) = rev; \ if (rev->always_readable) { \ pmix_event_evtimer_set(pmix_globals.evbase, &rev->ev, (cbfunc), rev); \ } else { \ pmix_event_set(pmix_globals.evbase, &rev->ev, (fid), PMIX_EV_READ, (cbfunc), rev); \ } \ if ((actv)) { \ PMIX_IOF_READ_ACTIVATE(rev) \ } \ } while (0); PMIX_EXPORT pmix_status_t pmix_iof_flush(void); PMIX_EXPORT pmix_status_t pmix_iof_write_output(const pmix_proc_t *name, pmix_iof_channel_t stream, const pmix_byte_object_t *bo); PMIX_EXPORT void pmix_iof_static_dump_output(pmix_iof_sink_t *sink); PMIX_EXPORT void pmix_iof_write_handler(int fd, short event, void *cbdata); PMIX_EXPORT bool pmix_iof_stdin_check(int fd); PMIX_EXPORT void pmix_iof_read_local_handler(int unusedfd, short event, void *cbdata); PMIX_EXPORT void pmix_iof_stdin_cb(int fd, short event, void *cbdata); PMIX_EXPORT pmix_status_t pmix_iof_process_iof(pmix_iof_channel_t channels, const pmix_proc_t *source, const pmix_byte_object_t *bo, const pmix_info_t *info, size_t ninfo, const pmix_iof_req_t *req); PMIX_EXPORT void pmix_iof_check_flags(pmix_info_t *info, pmix_iof_flags_t *flags); PMIX_EXPORT void pmix_iof_flush_residuals(void); END_C_DECLS #endif /* PMIX_IOF_H */