/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2021 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006-2020 Cisco Systems, Inc. All rights reserved * Copyright (c) 2007-2009 Sun Microsystems, Inc. All rights reserved. * Copyright (c) 2007-2017 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2013-2020 Intel, Inc. All rights reserved. * Copyright (c) 2015-2019 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2020 Geoffroy Vallee. All rights reserved. * Copyright (c) 2020 IBM Corporation. All rights reserved. * Copyright (c) 2021-2024 Nanook Consulting All rights reserved. * Copyright (c) 2021 Amazon.com, Inc. or its affiliates. All Rights * reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "prte_config.h" #include "src/include/constants.h" #include "src/include/version.h" #include #include #include #ifdef HAVE_STRINGS_H # include #endif /* HAVE_STRINGS_H */ #ifdef HAVE_UNISTD_H # include #endif #ifdef HAVE_SYS_PARAM_H # include #endif #include #include #include #ifdef HAVE_SYS_TYPES_H # include #endif /* HAVE_SYS_TYPES_H */ #ifdef HAVE_SYS_WAIT_H # include #endif /* HAVE_SYS_WAIT_H */ #ifdef HAVE_SYS_TIME_H # include #endif /* HAVE_SYS_TIME_H */ #include #ifdef HAVE_SYS_STAT_H # include #endif #ifdef HAVE_POLL_H # include #endif #include "src/event/event-internal.h" #include "src/mca/base/pmix_base.h" #include "src/mca/prteinstalldirs/prteinstalldirs.h" #include "src/pmix/pmix-internal.h" #include "src/threads/pmix_mutex.h" #include "src/util/daemon_init.h" #include "src/util/pmix_argv.h" #include "src/util/pmix_basename.h" #include "src/util/prte_cmd_line.h" #include "src/util/pmix_fd.h" #include "src/util/pmix_os_path.h" #include "src/util/pmix_output.h" #include "src/util/pmix_path.h" #include "src/util/pmix_printf.h" #include "src/util/pmix_environ.h" #include "src/util/pmix_getcwd.h" #include "src/util/pmix_show_help.h" #include "src/class/pmix_pointer_array.h" #include "src/runtime/prte_progress_threads.h" #include "src/mca/errmgr/errmgr.h" #include "src/mca/ess/base/base.h" #include "src/mca/schizo/base/base.h" #include "src/mca/state/base/base.h" #include "src/prted/prted.h" #include "src/runtime/prte_globals.h" #include "src/runtime/runtime.h" #include "src/prted/prted.h" typedef struct { prte_pmix_lock_t lock; pmix_info_t *info; size_t ninfo; } mylock_t; static pmix_nspace_t spawnednspace; static size_t evid = INT_MAX; static pmix_proc_t myproc; static bool verbose = false; static pmix_list_t forwarded_signals; static void signal_forward_callback(int signal); static void regcbfunc(pmix_status_t status, size_t ref, void *cbdata) { prte_pmix_lock_t *lock = (prte_pmix_lock_t *) cbdata; PRTE_HIDE_UNUSED_PARAMS(status); PMIX_ACQUIRE_OBJECT(lock); evid = ref; PRTE_PMIX_WAKEUP_THREAD(lock); } static void opcbfunc(pmix_status_t status, void *cbdata) { prte_pmix_lock_t *lock = (prte_pmix_lock_t *) cbdata; PRTE_HIDE_UNUSED_PARAMS(status); PMIX_ACQUIRE_OBJECT(lock); PRTE_PMIX_WAKEUP_THREAD(lock); } static void defhandler(size_t evhdlr_registration_id, pmix_status_t status, const pmix_proc_t *source, pmix_info_t info[], size_t ninfo, pmix_info_t *results, size_t nresults, pmix_event_notification_cbfunc_fn_t cbfunc, void *cbdata) { prte_pmix_lock_t *lock = NULL; size_t n; pmix_status_t rc; PRTE_HIDE_UNUSED_PARAMS(evhdlr_registration_id, source, results, nresults); if (verbose) { pmix_output(0, "PRUN: DEFHANDLER WITH STATUS %s(%d)", PMIx_Error_string(status), status); } if (PMIX_ERR_IOF_FAILURE == status) { pmix_proc_t target; pmix_info_t directive; /* tell PRTE to terminate our job */ PMIX_LOAD_PROCID(&target, prte_process_info.myproc.nspace, PMIX_RANK_WILDCARD); PMIX_INFO_LOAD(&directive, PMIX_JOB_CTRL_KILL, NULL, PMIX_BOOL); rc = PMIx_Job_control_nb(&target, 1, &directive, 1, NULL, NULL); if (PMIX_SUCCESS != rc && PMIX_OPERATION_SUCCEEDED != rc) { PMIx_tool_finalize(); /* exit with a non-zero status */ exit(1); } goto progress; } if (PMIX_ERR_UNREACH == status || PMIX_ERR_LOST_CONNECTION == status) { /* we should always have info returned to us - if not, there is * nothing we can do */ if (NULL != info) { for (n = 0; n < ninfo; n++) { if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_RETURN_OBJECT)) { lock = (prte_pmix_lock_t *) info[n].value.data.ptr; } } } if (NULL == lock) { exit(1); } /* save the status */ lock->status = status; /* release the lock */ PRTE_PMIX_WAKEUP_THREAD(lock); } progress: /* we _always_ have to execute the evhandler callback or * else the event progress engine will hang */ if (NULL != cbfunc) { cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata); } } static void evhandler(size_t evhdlr_registration_id, pmix_status_t status, const pmix_proc_t *source, pmix_info_t info[], size_t ninfo, pmix_info_t *results, size_t nresults, pmix_event_notification_cbfunc_fn_t cbfunc, void *cbdata) { prte_pmix_lock_t *lock = NULL; int jobstatus = 0; pmix_nspace_t jobid = {0}; size_t n; char *msg = NULL; PRTE_HIDE_UNUSED_PARAMS(evhdlr_registration_id, source, results, nresults); if (verbose) { pmix_output(0, "PRUN: EVHANDLER WITH STATUS %s(%d)", PMIx_Error_string(status), status); } /* we should always have info returned to us - if not, there is * nothing we can do */ if (NULL != info) { for (n = 0; n < ninfo; n++) { if (0 == strncmp(info[n].key, PMIX_JOB_TERM_STATUS, PMIX_MAX_KEYLEN)) { jobstatus = prte_pmix_convert_status(info[n].value.data.status); } else if (0 == strncmp(info[n].key, PMIX_EVENT_AFFECTED_PROC, PMIX_MAX_KEYLEN)) { PMIX_LOAD_NSPACE(jobid, info[n].value.data.proc->nspace); } else if (0 == strncmp(info[n].key, PMIX_EVENT_RETURN_OBJECT, PMIX_MAX_KEYLEN)) { lock = (prte_pmix_lock_t *) info[n].value.data.ptr; } else if (0 == strncmp(info[n].key, PMIX_EVENT_TEXT_MESSAGE, PMIX_MAX_KEYLEN)) { msg = info[n].value.data.string; } } if (verbose && PMIX_CHECK_NSPACE(jobid, spawnednspace)) { pmix_output(0, "JOB %s COMPLETED WITH STATUS %d", PRTE_JOBID_PRINT(jobid), jobstatus); } } if (NULL != lock) { /* save the status */ lock->status = jobstatus; if (NULL != msg) { lock->msg = strdup(msg); } /* release the lock */ PRTE_PMIX_WAKEUP_THREAD(lock); } /* we _always_ have to execute the evhandler callback or * else the event progress engine will hang */ if (NULL != cbfunc) { cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata); } } static void debug_cbfunc(size_t evhdlr_registration_id, pmix_status_t status, const pmix_proc_t *source, pmix_info_t info[], size_t ninfo, pmix_info_t *results, size_t nresults, pmix_event_notification_cbfunc_fn_t cbfunc, void *cbdata) { PRTE_HIDE_UNUSED_PARAMS(evhdlr_registration_id, status, source, info, ninfo, results, nresults); /* we _always_ have to execute the evhandler callback or * else the event progress engine will hang */ if (NULL != cbfunc) { cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata); } } static void setupcbfunc(pmix_status_t status, pmix_info_t info[], size_t ninfo, void *provided_cbdata, pmix_op_cbfunc_t cbfunc, void *cbdata) { mylock_t *mylock = (mylock_t *) provided_cbdata; size_t n; PRTE_HIDE_UNUSED_PARAMS(status); if (NULL != info) { mylock->ninfo = ninfo; PMIX_INFO_CREATE(mylock->info, mylock->ninfo); /* cycle across the provided info */ for (n = 0; n < ninfo; n++) { PMIX_INFO_XFER(&mylock->info[n], &info[n]); } } else { mylock->info = NULL; mylock->ninfo = 0; } /* release the caller */ if (NULL != cbfunc) { cbfunc(PMIX_SUCCESS, cbdata); } PRTE_PMIX_WAKEUP_THREAD(&mylock->lock); } static int wait_pipe[2]; static int wait_dvm(pid_t pid) { char reply; int rc; int status; close(wait_pipe[1]); do { rc = read(wait_pipe[0], &reply, 1); } while (0 > rc && EINTR == errno); if (1 == rc && 'K' == reply) { return 0; } else if (0 == rc) { waitpid(pid, &status, 0); if (WIFEXITED(status)) { return WEXITSTATUS(status); } } return 255; } int prun_common(pmix_cli_result_t *results, prte_schizo_base_module_t *schizo, int pargc, char **pargv) { int rc = 1, i; char *param, *ptr; prte_pmix_lock_t lock, rellock; pmix_list_t apps; prte_pmix_app_t *app; void *tinfo, *jinfo; pmix_info_t info, *iptr; pmix_proc_t pname; pmix_status_t ret; bool flag; size_t n, ninfo; pmix_app_t *papps = NULL; size_t napps; mylock_t mylock; uint32_t ui32; pid_t pid; prte_ess_base_signal_t *sig; prte_event_list_item_t *evitm; pmix_value_t *val; pmix_data_array_t darray; char hostname[PRTE_PATH_MAX]; pmix_rank_t rank; pmix_status_t code; pmix_proc_t parent; pmix_cli_item_t *opt; PRTE_HIDE_UNUSED_PARAMS(pargc); /* init the globals */ PMIX_CONSTRUCT(&apps, pmix_list_t); PMIX_CONSTRUCT(&forwarded_signals, pmix_list_t); gethostname(hostname, sizeof(hostname)); /* detach from controlling terminal * otherwise, remain attached so output can get to us */ if (pmix_cmd_line_is_taken(results, PRTE_CLI_DAEMONIZE)) { pipe(wait_pipe); prte_state_base.parent_fd = wait_pipe[1]; prte_daemon_init_callback(NULL, wait_dvm); close(wait_pipe[0]); } else { #if defined(HAVE_SETSID) /* see if we were directed to separate from current session */ if (pmix_cmd_line_is_taken(results, PRTE_CLI_SET_SID)) { setsid(); } #endif } /** setup callbacks for signals we should forward */ PMIX_CONSTRUCT(&prte_ess_base_signals, pmix_list_t); opt = pmix_cmd_line_get_param(results, PRTE_CLI_FWD_SIGNALS); if (NULL != opt) { param = opt->values[0]; } else { param = NULL; } if (PRTE_SUCCESS != (rc = prte_ess_base_setup_signals(param))) { return rc; } PMIX_LIST_FOREACH(sig, &prte_ess_base_signals, prte_ess_base_signal_t) { signal(sig->signal, signal_forward_callback); } /* setup the job data global table */ prte_job_data = PMIX_NEW(pmix_pointer_array_t); ret = pmix_pointer_array_init(prte_job_data, PRTE_GLOBAL_ARRAY_BLOCK_SIZE, PRTE_GLOBAL_ARRAY_MAX_SIZE, PRTE_GLOBAL_ARRAY_BLOCK_SIZE); if (PRTE_SUCCESS != ret) { PRTE_ERROR_LOG(ret); return rc; } /* setup options */ PMIX_INFO_LIST_START(tinfo); /* tell PMIx what our name should be */ if (NULL != (param = getenv("PMIX_NAMESPACE"))) { PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_TOOL_NSPACE, param, PMIX_STRING); } else { pmix_asprintf(¶m, "%s.%s.%lu", prte_tool_basename, hostname, (unsigned long)getpid()); PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_TOOL_NSPACE, param, PMIX_STRING); free(param); } if (NULL != (param = getenv("PMIX_RANK"))) { rank = strtoul(param, NULL, 10); } else { rank = 0; } PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_TOOL_RANK, &rank, PMIX_PROC_RANK); if (pmix_cmd_line_is_taken(results, PRTE_CLI_DO_NOT_CONNECT)) { PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_TOOL_DO_NOT_CONNECT, NULL, PMIX_BOOL); } else if (pmix_cmd_line_is_taken(results, PRTE_CLI_SYS_SERVER_FIRST)) { PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_CONNECT_SYSTEM_FIRST, NULL, PMIX_BOOL); } else if (pmix_cmd_line_is_taken(results, PRTE_CLI_SYS_SERVER_ONLY)) { PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_CONNECT_TO_SYSTEM, NULL, PMIX_BOOL); } opt = pmix_cmd_line_get_param(results, PRTE_CLI_WAIT_TO_CONNECT); if (NULL != opt) { ui32 = strtol(opt->values[0], NULL, 10); PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_CONNECT_RETRY_DELAY, &ui32, PMIX_UINT32); } opt = pmix_cmd_line_get_param(results, PRTE_CLI_NUM_CONNECT_RETRIES); if (NULL != opt) { ui32 = strtol(opt->values[0], NULL, 10); PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_CONNECT_MAX_RETRIES, &ui32, PMIX_UINT32); } opt = pmix_cmd_line_get_param(results, PRTE_CLI_PID); if (NULL != opt) { /* see if it is an integer value */ char *leftover; leftover = NULL; pid = strtol(opt->values[0], &leftover, 10); if (NULL == leftover || 0 == strlen(leftover)) { /* it is an integer */ PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_SERVER_PIDINFO, &pid, PMIX_PID); } else if (0 == strncasecmp(opt->values[0], "file", 4)) { FILE *fp; /* step over the file: prefix */ param = strchr(opt->values[0], ':'); if (NULL == param) { /* malformed input */ pmix_show_help("help-prun.txt", "bad-option-input", true, prte_tool_basename, "--pid", opt->values[0], "file:path"); return PRTE_ERR_BAD_PARAM; } ++param; fp = fopen(param, "r"); if (NULL == fp) { pmix_show_help("help-prun.txt", "file-open-error", true, prte_tool_basename, "--pid", opt->values[0], param); return PRTE_ERR_BAD_PARAM; } rc = fscanf(fp, "%lu", (unsigned long *) &pid); if (1 != rc) { /* if we were unable to obtain the single conversion we * require, then error out */ pmix_show_help("help-prun.txt", "bad-file", true, prte_tool_basename, "--pid", opt->values[0], param); fclose(fp); return PRTE_ERR_BAD_PARAM; } fclose(fp); PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_SERVER_PIDINFO, &pid, PMIX_PID); } else { /* a string that's neither an integer nor starts with 'file:' */ pmix_show_help("help-prun.txt", "bad-option-input", true, prte_tool_basename, "--pid", opt->values[0], "file:path"); return PRTE_ERR_BAD_PARAM; } } opt = pmix_cmd_line_get_param(results, PRTE_CLI_NAMESPACE); if (NULL != opt) { PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_SERVER_NSPACE, opt->values[0], PMIX_STRING); } /* set our session directory to something hopefully unique so * our rendezvous files don't conflict with other prun/prte * instances */ pmix_asprintf(&ptr, "%s/%s.session.%s.%lu.%lu", pmix_tmp_directory(), prte_tool_basename, prte_process_info.nodename, (unsigned long) geteuid(), (unsigned long) getpid()); PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_SERVER_TMPDIR, ptr, PMIX_STRING); free(ptr); /* we are also a launcher, so pass that down so PMIx knows * to setup rendezvous points */ PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_LAUNCHER, NULL, PMIX_BOOL); /* we always support tool rendezvous */ PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_SERVER_TOOL_SUPPORT, NULL, PMIX_BOOL); /* if they specified the URI, then pass it along */ opt = pmix_cmd_line_get_param(results, PRTE_CLI_DVM_URI); if (NULL != opt) { PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_SERVER_URI, opt->values[0], PMIX_STRING); } /* output all IOF */ PMIX_INFO_LIST_ADD(ret, tinfo, PMIX_IOF_LOCAL_OUTPUT, NULL, PMIX_BOOL); /* convert to array of info */ PMIX_INFO_LIST_CONVERT(ret, tinfo, &darray); iptr = (pmix_info_t *) darray.array; ninfo = darray.size; PMIX_INFO_LIST_RELEASE(tinfo); /* now initialize PMIx */ if (PMIX_SUCCESS != (ret = PMIx_tool_init(&myproc, iptr, ninfo))) { fprintf(stderr, "%s failed to initialize, likely due to no DVM being available\n", prte_tool_basename); exit(1); } PMIX_INFO_FREE(iptr, ninfo); /* register a default event handler and pass it our release lock * so we can cleanly exit if the server goes away */ PRTE_PMIX_CONSTRUCT_LOCK(&rellock); PMIX_INFO_CREATE(iptr, 2); PMIX_INFO_LOAD(&iptr[1], PMIX_EVENT_RETURN_OBJECT, &rellock, PMIX_POINTER); PMIX_INFO_LOAD(&iptr[0], PMIX_EVENT_HDLR_NAME, "DEFAULT", PMIX_STRING); PRTE_PMIX_CONSTRUCT_LOCK(&lock); PMIx_Register_event_handler(NULL, 0, iptr, 2, defhandler, regcbfunc, &lock); PRTE_PMIX_WAIT_THREAD(&lock); PRTE_PMIX_DESTRUCT_LOCK(&lock); PMIX_INFO_FREE(iptr, 2); /***** CONSTRUCT THE APP'S JOB-INFO ****/ PMIX_INFO_LIST_START(jinfo); PMIX_LOAD_PROCID(&parent, prte_process_info.myproc.nspace, prte_process_info.myproc.rank); /***** CHECK FOR LAUNCH DIRECTIVES - ADD THEM TO JOB INFO IF FOUND ****/ PMIX_LOAD_PROCID(&pname, myproc.nspace, PMIX_RANK_WILDCARD); PMIX_INFO_LOAD(&info, PMIX_OPTIONAL, NULL, PMIX_BOOL); ret = PMIx_Get(&pname, PMIX_LAUNCH_DIRECTIVES, &info, 1, &val); PMIX_INFO_DESTRUCT(&info); if (PMIX_SUCCESS == ret) { iptr = (pmix_info_t *) val->data.darray->array; ninfo = val->data.darray->size; for (n = 0; n < ninfo; n++) { PMIX_INFO_LIST_XFER(ret, jinfo, &iptr[n]); } PMIX_VALUE_RELEASE(val); } /* we want to be notified upon job completion */ PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_NOTIFY_COMPLETION, &flag, PMIX_BOOL); /* pass the personality */ PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_PERSONALITY, schizo->name, PMIX_STRING); /* get display options */ opt = pmix_cmd_line_get_param(results, PRTE_CLI_DISPLAY); if (NULL != opt) { ret = prte_schizo_base_parse_display(opt, jinfo); if (PRTE_SUCCESS != ret) { PRTE_UPDATE_EXIT_STATUS(PRTE_ERR_FATAL); goto DONE; } } /* check for output options */ opt = pmix_cmd_line_get_param(results, PRTE_CLI_OUTPUT); if (NULL != opt) { ret = prte_schizo_base_parse_output(opt, jinfo); if (PRTE_SUCCESS != ret) { PRTE_UPDATE_EXIT_STATUS(PRTE_ERR_FATAL); goto DONE; } } /* check for runtime options */ opt = pmix_cmd_line_get_param(results, PRTE_CLI_RTOS); if (NULL != opt) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_RUNTIME_OPTIONS, opt->values[0], PMIX_STRING); } /* check what user wants us to do with stdin */ opt = pmix_cmd_line_get_param(results, PRTE_CLI_STDIN); if (NULL != opt) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_STDIN_TGT, opt->values[0], PMIX_STRING); } opt = pmix_cmd_line_get_param(results, PRTE_CLI_MAPBY); if (NULL != opt) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_MAPBY, opt->values[0], PMIX_STRING); } /* if the user specified a ranking policy, then set it */ opt = pmix_cmd_line_get_param(results, PRTE_CLI_RANKBY); if (NULL != opt) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_RANKBY, opt->values[0], PMIX_STRING); } /* if the user specified a binding policy, then set it */ opt = pmix_cmd_line_get_param(results, PRTE_CLI_BINDTO); if (NULL != opt) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_BINDTO, opt->values[0], PMIX_STRING); } /* check for an exec agent */ opt = pmix_cmd_line_get_param(results, PRTE_CLI_EXEC_AGENT); if (NULL != opt) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_EXEC_AGENT, opt->values[0], PMIX_STRING); } /* mark if recovery was enabled on the cmd line */ if (pmix_cmd_line_is_taken(results, PRTE_CLI_ENABLE_RECOVERY)) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_JOB_RECOVERABLE, &flag, PMIX_BOOL); } /* record the max restarts */ opt = pmix_cmd_line_get_param(results, PRTE_CLI_MAX_RESTARTS); if (NULL != opt) { ui32 = strtol(opt->values[0], NULL, 10); PMIX_LIST_FOREACH(app, &apps, prte_pmix_app_t) { PMIX_INFO_LIST_ADD(ret, app->info, PMIX_MAX_RESTARTS, &ui32, PMIX_UINT32); } } /* if continuous operation was specified */ if (pmix_cmd_line_is_taken(results, PRTE_CLI_CONTINUOUS)) { /* mark this job as continuously operating */ PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_JOB_CONTINUOUS, &flag, PMIX_BOOL); } #ifdef PMIX_ABORT_NONZERO_EXIT /* if ignore non-zero exit was specified */ if (pmix_cmd_line_is_taken(results, PRTE_CLI_TERM_NONZERO)) { /* mark this job to not terminate if a proc exits with non-zero status */ PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_ABORT_NONZERO_EXIT, NULL, PMIX_BOOL); } #endif /* if stop-on-exec was specified */ if (pmix_cmd_line_is_taken(results, PRTE_CLI_STOP_ON_EXEC)) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_DEBUG_STOP_ON_EXEC, NULL, PMIX_BOOL); } /* check for a job timeout specification, to be provided in seconds * as that is what MPICH used */ i = 0; opt = pmix_cmd_line_get_param(results, PRTE_CLI_TIMEOUT); if (NULL != opt) { i = strtol(opt->values[0], NULL, 10); } else if (NULL != (param = getenv("MPIEXEC_TIMEOUT"))) { i = strtol(param, NULL, 10); } if (0 != i) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_JOB_TIMEOUT, &i, PMIX_INT); } if (pmix_cmd_line_is_taken(results, PRTE_CLI_STACK_TRACES)) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_TIMEOUT_STACKTRACES, &flag, PMIX_BOOL); } if (pmix_cmd_line_is_taken(results, PRTE_CLI_REPORT_STATE)) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_TIMEOUT_REPORT_STATE, &flag, PMIX_BOOL); } opt = pmix_cmd_line_get_param(results, PRTE_CLI_SPAWN_TIMEOUT); if (NULL != opt) { i = strtol(opt->values[0], NULL, 10); PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_SPAWN_TIMEOUT, &i, PMIX_INT); } opt = pmix_cmd_line_get_param(results, PRTE_CLI_DO_NOT_AGG_HELP); if (NULL != opt) { flag = false; PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_LOG_AGG, &flag, PMIX_BOOL); } #ifdef PMIX_MEM_ALLOC_KIND opt = pmix_cmd_line_get_param(results, PRTE_CLI_MEM_ALLOC_KIND); if (NULL != opt) { PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_MEM_ALLOC_KIND, opt->values[0], PMIX_STRING); } #endif /* give the schizo components a chance to add to the job info */ schizo->job_info(results, jinfo); /* pickup any relevant envars */ ninfo = 4; PMIX_INFO_CREATE(iptr, ninfo); flag = true; PMIX_INFO_LOAD(&iptr[0], PMIX_SETUP_APP_ENVARS, &flag, PMIX_BOOL); ui32 = geteuid(); PMIX_INFO_LOAD(&iptr[1], PMIX_USERID, &ui32, PMIX_UINT32); ui32 = getegid(); PMIX_INFO_LOAD(&iptr[2], PMIX_GRPID, &ui32, PMIX_UINT32); PMIX_INFO_LOAD(&iptr[3], PMIX_PERSONALITY, schizo->name, PMIX_STRING); PRTE_PMIX_CONSTRUCT_LOCK(&mylock.lock); ret = PMIx_server_setup_application(prte_process_info.myproc.nspace, iptr, ninfo, setupcbfunc, &mylock); if (PMIX_SUCCESS != ret) { PMIX_ERROR_LOG(ret); PRTE_PMIX_DESTRUCT_LOCK(&mylock.lock); PRTE_UPDATE_EXIT_STATUS(ret); goto DONE; } PRTE_PMIX_WAIT_THREAD(&mylock.lock); PMIX_INFO_FREE(iptr, ninfo); PRTE_PMIX_DESTRUCT_LOCK(&mylock.lock); /* transfer any returned ENVARS to the job_info */ if (NULL != mylock.info) { for (n = 0; n < mylock.ninfo; n++) { if (PMIX_CHECK_KEY(&mylock.info[n], PMIX_SET_ENVAR) || PMIX_CHECK_KEY(&mylock.info[n], PMIX_ADD_ENVAR) || PMIX_CHECK_KEY(&mylock.info[n], PMIX_UNSET_ENVAR) || PMIX_CHECK_KEY(&mylock.info[n], PMIX_PREPEND_ENVAR) || PMIX_CHECK_KEY(&mylock.info[n], PMIX_APPEND_ENVAR)) { PMIX_INFO_LIST_XFER(ret, jinfo, &mylock.info[n]); } } PMIX_INFO_FREE(mylock.info, mylock.ninfo); } /* mark that we harvested envars so prte knows not to do it again */ PMIX_INFO_LIST_ADD(ret, jinfo, PMIX_ENVARS_HARVESTED, NULL, PMIX_BOOL); /* they want to run an application, so let's parse * the cmd line to get it */ if (PRTE_SUCCESS != (rc = prte_parse_locals(schizo, &apps, pargv, NULL, NULL))) { PRTE_ERROR_LOG(rc); PMIX_LIST_DESTRUCT(&apps); goto DONE; } /* bozo check */ if (0 == pmix_list_get_size(&apps)) { pmix_output(0, "No application specified!"); goto DONE; } /* convert the job info into an array */ PMIX_INFO_LIST_CONVERT(ret, jinfo, &darray); iptr = (pmix_info_t *) darray.array; ninfo = darray.size; PMIX_INFO_LIST_RELEASE(jinfo); /* convert the apps to an array */ napps = pmix_list_get_size(&apps); PMIX_APP_CREATE(papps, napps); n = 0; PMIX_LIST_FOREACH(app, &apps, prte_pmix_app_t) { papps[n].cmd = strdup(app->app.cmd); papps[n].argv = PMIX_ARGV_COPY_COMPAT(app->app.argv); papps[n].env = PMIX_ARGV_COPY_COMPAT(app->app.env); papps[n].cwd = strdup(app->app.cwd); papps[n].maxprocs = app->app.maxprocs; PMIX_INFO_LIST_CONVERT(ret, app->info, &darray); papps[n].info = (pmix_info_t *) darray.array; papps[n].ninfo = darray.size; ++n; } PMIX_LIST_DESTRUCT(&apps); if (verbose) { pmix_output(0, "Calling PMIx_Spawn"); } ret = PMIx_Spawn(iptr, ninfo, papps, napps, spawnednspace); if (PRTE_SUCCESS != ret) { pmix_output(0, "PMIx_Spawn failed (%d): %s", ret, PMIx_Error_string(ret)); rc = ret; goto DONE; } /* register to receive the ready-for-debug event - the internal * event library can relay it to any tool connected to us */ PRTE_PMIX_CONSTRUCT_LOCK(&lock); code = PMIX_READY_FOR_DEBUG; n = 0; PMIX_INFO_CREATE(iptr, 2); PMIX_INFO_LOAD(&iptr[n], PMIX_EVENT_HDLR_NAME, "READY-FOR-DEBUG", PMIX_STRING); ++n; PMIX_LOAD_PROCID(&pname, spawnednspace, PMIX_RANK_WILDCARD); PMIX_INFO_LOAD(&iptr[n], PMIX_EVENT_AFFECTED_PROC, &pname, PMIX_PROC); PMIx_Register_event_handler(&code, 1, iptr, 2, debug_cbfunc, regcbfunc, (void *) &lock); PRTE_PMIX_WAIT_THREAD(&lock); PRTE_PMIX_DESTRUCT_LOCK(&lock); PMIX_INFO_FREE(iptr, 2); /* check what user wants us to do with stdin */ PMIX_LOAD_NSPACE(pname.nspace, spawnednspace); opt = pmix_cmd_line_get_param(results, PRTE_CLI_STDIN); if (NULL != opt) { if (0 == strcmp(opt->values[0], "all")) { pname.rank = PMIX_RANK_WILDCARD; } else if (0 == strcmp(opt->values[0], "none")) { pname.rank = PMIX_RANK_INVALID; } else { pname.rank = 0; } } else { pname.rank = 0; } if (PMIX_RANK_INVALID != pname.rank) { PMIX_INFO_CREATE(iptr, 1); PMIX_INFO_LOAD(&iptr[0], PMIX_IOF_PUSH_STDIN, NULL, PMIX_BOOL); PRTE_PMIX_CONSTRUCT_LOCK(&lock); ret = PMIx_IOF_push(&pname, 1, NULL, iptr, 1, opcbfunc, &lock); if (PMIX_SUCCESS != ret && PMIX_OPERATION_SUCCEEDED != ret) { pmix_output(0, "IOF push of stdin failed: %s", PMIx_Error_string(ret)); } else if (PMIX_SUCCESS == ret) { PRTE_PMIX_WAIT_THREAD(&lock); } PRTE_PMIX_DESTRUCT_LOCK(&lock); PMIX_INFO_FREE(iptr, 1); } /* register to be notified when * our job completes */ ret = PMIX_EVENT_JOB_END; /* setup the info */ ninfo = 3; PMIX_INFO_CREATE(iptr, ninfo); /* give the handler a name */ PMIX_INFO_LOAD(&iptr[0], PMIX_EVENT_HDLR_NAME, "JOB_TERMINATION_EVENT", PMIX_STRING); /* specify we only want to be notified when our * job terminates */ PMIX_LOAD_PROCID(&pname, spawnednspace, PMIX_RANK_WILDCARD); PMIX_INFO_LOAD(&iptr[1], PMIX_EVENT_AFFECTED_PROC, &pname, PMIX_PROC); /* request that they return our lock object */ PMIX_INFO_LOAD(&iptr[2], PMIX_EVENT_RETURN_OBJECT, &rellock, PMIX_POINTER); /* do the registration */ PRTE_PMIX_CONSTRUCT_LOCK(&lock); PMIx_Register_event_handler(&ret, 1, iptr, ninfo, evhandler, regcbfunc, &lock); PRTE_PMIX_WAIT_THREAD(&lock); PRTE_PMIX_DESTRUCT_LOCK(&lock); if (verbose) { pmix_output(0, "JOB %s EXECUTING", PRTE_JOBID_PRINT(spawnednspace)); } PRTE_PMIX_WAIT_THREAD(&rellock); /* save the status */ rc = rellock.status; /* output any message */ if (NULL != rellock.msg) { fprintf(stderr, "%s\n", rellock.msg); } /* if we lost connection to the server, then we are done */ if (PMIX_ERR_LOST_CONNECTION == rc || PMIX_ERR_UNREACH == rc) { goto DONE; } /* deregister our event handler */ PRTE_PMIX_CONSTRUCT_LOCK(&lock); PMIx_Deregister_event_handler(evid, opcbfunc, &lock); PRTE_PMIX_WAIT_THREAD(&lock); PRTE_PMIX_DESTRUCT_LOCK(&lock); PRTE_PMIX_DESTRUCT_LOCK(&rellock); /* close the push of our stdin */ PMIX_INFO_LOAD(&info, PMIX_IOF_COMPLETE, NULL, PMIX_BOOL); PRTE_PMIX_CONSTRUCT_LOCK(&lock); ret = PMIx_IOF_push(NULL, 0, NULL, &info, 1, opcbfunc, &lock); if (PMIX_SUCCESS != ret && PMIX_OPERATION_SUCCEEDED != ret) { pmix_output(0, "IOF close of stdin failed: %s", PMIx_Error_string(ret)); } else if (PMIX_SUCCESS == ret) { PRTE_PMIX_WAIT_THREAD(&lock); } PRTE_PMIX_DESTRUCT_LOCK(&lock); PMIX_INFO_DESTRUCT(&info); DONE: PMIX_LIST_FOREACH(evitm, &forwarded_signals, prte_event_list_item_t) { prte_event_signal_del(&evitm->ev); } PMIX_LIST_DESTRUCT(&forwarded_signals); if (NULL != papps) { PMIX_APP_FREE(papps, napps); } /* cleanup and leave */ ret = PMIx_tool_finalize(); if (PMIX_SUCCESS != ret) { // Since the user job has probably exited by // now, let's preserve its return code and print // a warning here, if prte logging is on. pmix_output(0, "PMIx_tool_finalize() failed. Status = %d", ret); } return rc; } static void signal_forward_callback(int signum) { pmix_status_t rc; pmix_proc_t proc; pmix_info_t info; if (verbose) { fprintf(stderr, "%s: Forwarding signal %d to job\n", prte_tool_basename, signum); } /* send the signal out to the processes */ PMIX_LOAD_PROCID(&proc, spawnednspace, PMIX_RANK_WILDCARD); PMIX_INFO_LOAD(&info, PMIX_JOB_CTRL_SIGNAL, &signum, PMIX_INT); rc = PMIx_Job_control(&proc, 1, &info, 1, NULL, NULL); if (PMIX_SUCCESS != rc && PMIX_OPERATION_SUCCEEDED != rc) { fprintf(stderr, "Signal %d could not be sent to job %s (returned %s)", signum, spawnednspace, PMIx_Error_string(rc)); } }