/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2013-2017 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2013-2017 Inria. All rights reserved. * Copyright (c) 2015 Bull SAS. All rights reserved. * Copyright (c) 2016-2019 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2017-2018 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2018 Amazon.com, Inc. or its affiliates. All Rights reserved. * Copyright (c) 2019 Triad National Security, LLC. All rights reserved. * Copyright (c) 2022 IBM Corporation. All rights reserved * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "ompi_config.h" #include "common_monitoring.h" #include "common_monitoring_coll.h" #include "ompi/constants.h" #include "ompi/communicator/communicator.h" #include "opal/mca/base/mca_base_component_repository.h" #include "opal/class/opal_hash_table.h" #include "opal/util/output.h" #include "opal/util/printf.h" #include "opal/runtime/opal.h" #include #if SIZEOF_LONG_LONG == SIZEOF_SIZE_T #define MCA_MONITORING_VAR_TYPE MCA_BASE_VAR_TYPE_UNSIGNED_LONG_LONG #elif SIZEOF_LONG == SIZEOF_SIZE_T #define MCA_MONITORING_VAR_TYPE MCA_BASE_VAR_TYPE_UNSIGNED_LONG #endif /*** Monitoring specific variables ***/ /* Keep tracks of how many components are currently using the common part */ static opal_atomic_int32_t mca_common_monitoring_hold = 0; /* Output parameters */ int mca_common_monitoring_output_stream_id = -1; static opal_output_stream_t mca_common_monitoring_output_stream_obj = { .lds_verbose_level = 0, .lds_want_syslog = false, .lds_prefix = NULL, .lds_suffix = NULL, .lds_is_debugging = true, .lds_want_stdout = false, .lds_want_stderr = true, .lds_want_file = false, .lds_want_file_append = false, .lds_file_suffix = NULL }; /*** MCA params to mark the monitoring as enabled. ***/ /* This signals that the monitoring will hijack the PML, OSC and COLL */ int mca_common_monitoring_enabled = 0; int mca_common_monitoring_current_state = 0; /* Signals there will be an output of the monitored data at component close */ static int mca_common_monitoring_output_enabled = 0; /* File where to output the monitored data */ static char* mca_common_monitoring_initial_filename = ""; static char* mca_common_monitoring_current_filename = NULL; /* array for storing monitoring data*/ static opal_atomic_size_t* pml_data = NULL; static opal_atomic_size_t* pml_count = NULL; static opal_atomic_size_t* filtered_pml_data = NULL; static opal_atomic_size_t* filtered_pml_count = NULL; static opal_atomic_size_t* osc_data_s = NULL; static opal_atomic_size_t* osc_count_s = NULL; static opal_atomic_size_t* osc_data_r = NULL; static opal_atomic_size_t* osc_count_r = NULL; static opal_atomic_size_t* coll_data = NULL; static opal_atomic_size_t* coll_count = NULL; static opal_atomic_size_t* size_histogram = NULL; static const int max_size_histogram = 66; static double log10_2 = 0.; static int rank_world = -1; static int nprocs_world = 0; opal_hash_table_t *ompi_common_monitoring_translation_ht = NULL; /* Reset all the monitoring arrays */ static void mca_common_monitoring_reset ( void ); /* Flushes the monitored data and reset the values */ static int mca_common_monitoring_flush (int fd, char* filename); /* Retrieve the PML recorded count of messages sent */ static int mca_common_monitoring_get_pml_count (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle); /* Retrieve the PML recorded amount of data sent */ static int mca_common_monitoring_get_pml_size (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle); /* Retrieve the OSC recorded count of messages sent */ static int mca_common_monitoring_get_osc_sent_count (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle); /* Retrieve the OSC recorded amount of data sent */ static int mca_common_monitoring_get_osc_sent_size (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle); /* Retrieve the OSC recorded count of messages received */ static int mca_common_monitoring_get_osc_recv_count (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle); /* Retrieve the OSC recorded amount of data received */ static int mca_common_monitoring_get_osc_recv_size (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle); /* Retrieve the COLL recorded count of messages sent */ static int mca_common_monitoring_get_coll_count (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle); /* Retrieve the COLL recorded amount of data sent */ static int mca_common_monitoring_get_coll_size (const struct mca_base_pvar_t *pvar, void *value, void *obj_handle); /* Set the filename where to output the monitored data */ static int mca_common_monitoring_set_flush(struct mca_base_pvar_t *pvar, const void *value, void *obj); /* Does nothing, as the pml_monitoring_flush pvar has no point to be read */ static int mca_common_monitoring_get_flush(const struct mca_base_pvar_t *pvar, void *value, void *obj); /* pml_monitoring_count, pml_monitoring_size, osc_monitoring_sent_count, osc_monitoring sent_size, osc_monitoring_recv_size and osc_monitoring_recv_count pvar notify function */ static int mca_common_monitoring_comm_size_notify(mca_base_pvar_t *pvar, mca_base_pvar_event_t event, void *obj_handle, int *count); /* pml_monitoring_flush pvar notify function */ static int mca_common_monitoring_notify_flush(struct mca_base_pvar_t *pvar, mca_base_pvar_event_t event, void *obj, int *count); static int mca_common_monitoring_set_flush(struct mca_base_pvar_t *pvar, const void *value, void *obj) { if( NULL != mca_common_monitoring_current_filename ) { free(mca_common_monitoring_current_filename); } if( NULL == *(char**)value || 0 == strlen((char*)value) ) { /* No more output */ mca_common_monitoring_current_filename = NULL; } else { mca_common_monitoring_current_filename = strdup((char*)value); if( NULL == mca_common_monitoring_current_filename ) return OMPI_ERROR; } return OMPI_SUCCESS; } static int mca_common_monitoring_get_flush(const struct mca_base_pvar_t *pvar, void *value, void *obj) { return OMPI_SUCCESS; } static int mca_common_monitoring_notify_flush(struct mca_base_pvar_t *pvar, mca_base_pvar_event_t event, void *obj, int *count) { switch (event) { case MCA_BASE_PVAR_HANDLE_BIND: mca_common_monitoring_reset(); *count = (NULL == mca_common_monitoring_current_filename ? 0 : strlen(mca_common_monitoring_current_filename)); case MCA_BASE_PVAR_HANDLE_UNBIND: return OMPI_SUCCESS; case MCA_BASE_PVAR_HANDLE_START: mca_common_monitoring_current_state = mca_common_monitoring_enabled; mca_common_monitoring_output_enabled = 0; /* we can't control the monitoring via MPIT and * expect accurate answer upon MPI_Finalize. */ return OMPI_SUCCESS; case MCA_BASE_PVAR_HANDLE_STOP: return mca_common_monitoring_flush(3, mca_common_monitoring_current_filename); } return OMPI_ERROR; } static int mca_common_monitoring_comm_size_notify(mca_base_pvar_t *pvar, mca_base_pvar_event_t event, void *obj_handle, int *count) { switch (event) { case MCA_BASE_PVAR_HANDLE_BIND: /* Return the size of the communicator as the number of values */ *count = ompi_comm_size ((ompi_communicator_t *) obj_handle); case MCA_BASE_PVAR_HANDLE_UNBIND: return OMPI_SUCCESS; case MCA_BASE_PVAR_HANDLE_START: mca_common_monitoring_current_state = mca_common_monitoring_enabled; return OMPI_SUCCESS; case MCA_BASE_PVAR_HANDLE_STOP: mca_common_monitoring_current_state = 0; return OMPI_SUCCESS; } return OMPI_ERROR; } int mca_common_monitoring_init( void ) { if( !mca_common_monitoring_enabled ) return OMPI_ERROR; if( 1 < opal_atomic_add_fetch_32(&mca_common_monitoring_hold, 1) ) return OMPI_SUCCESS; /* Already initialized */ const char *hostname; /* Initialize constant */ log10_2 = log10(2.); /* Open the opal_output stream */ hostname = opal_gethostname(); opal_asprintf(&mca_common_monitoring_output_stream_obj.lds_prefix, "[%s:%06d] monitoring: ", hostname, getpid()); mca_common_monitoring_output_stream_id = opal_output_open(&mca_common_monitoring_output_stream_obj); /* Initialize proc translation hashtable */ ompi_common_monitoring_translation_ht = OBJ_NEW(opal_hash_table_t); opal_hash_table_init(ompi_common_monitoring_translation_ht, 2048); return OMPI_SUCCESS; } void mca_common_monitoring_finalize( void ) { if( ! mca_common_monitoring_enabled || /* Don't release if not last */ 0 < opal_atomic_sub_fetch_32(&mca_common_monitoring_hold, 1) ) return; OPAL_MONITORING_PRINT_INFO("common_component_finish"); /* Dump monitoring information */ mca_common_monitoring_flush(mca_common_monitoring_output_enabled, mca_common_monitoring_current_filename); /* Disable all monitoring */ mca_common_monitoring_enabled = 0; /* Close the opal_output stream */ opal_output_close(mca_common_monitoring_output_stream_id); free(mca_common_monitoring_output_stream_obj.lds_prefix); /* Free internal data structure */ free((void *) pml_data); /* a single allocation */ opal_hash_table_remove_all( ompi_common_monitoring_translation_ht ); OBJ_RELEASE(ompi_common_monitoring_translation_ht); mca_common_monitoring_coll_finalize(); if( NULL != mca_common_monitoring_current_filename ) { free(mca_common_monitoring_current_filename); mca_common_monitoring_current_filename = NULL; } } int mca_common_monitoring_register(void) { /* Because we are playing tricks with the component close, we should not * use mca_base_component_var_register but instead stay with the basic * version mca_base_var_register. */ (void)mca_base_var_register("ompi", "pml", "monitoring", "enable", "Enable the monitoring at the PML level. A value of 0 " "will disable the monitoring (default). A value of 1 will " "aggregate all monitoring information (point-to-point and " "collective). Any other value will enable filtered monitoring", MCA_BASE_VAR_TYPE_INT, NULL, MPI_T_BIND_NO_OBJECT, MCA_BASE_VAR_FLAG_DWG, OPAL_INFO_LVL_4, MCA_BASE_VAR_SCOPE_READONLY, &mca_common_monitoring_enabled); mca_common_monitoring_current_state = mca_common_monitoring_enabled; (void)mca_base_var_register("ompi", "pml", "monitoring", "enable_output", "Enable the PML monitoring textual output at MPI_Finalize " "(it will be automatically turned off when MPIT is used to " "monitor communications). This value should be different " "than 0 in order for the output to be enabled (default disable)", MCA_BASE_VAR_TYPE_INT, NULL, MPI_T_BIND_NO_OBJECT, MCA_BASE_VAR_FLAG_DWG, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_common_monitoring_output_enabled); (void)mca_base_var_register("ompi", "pml", "monitoring", "filename", "The name of the file where the monitoring information " "should be saved (the filename will be extended with the " "process rank and the \".prof\" extension). If this field " "is NULL the monitoring will not be saved.", MCA_BASE_VAR_TYPE_STRING, NULL, MPI_T_BIND_NO_OBJECT, MCA_BASE_VAR_FLAG_DWG, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_common_monitoring_initial_filename); /* Now that the MCA variables are automatically unregistered when * their component close, we need to keep a safe copy of the * filename. * Keep the copy completely separated in order to let the initial * filename to be handled by the framework. It's easier to deal * with the string lifetime. */ if( NULL != mca_common_monitoring_initial_filename ) { mca_common_monitoring_current_filename = strdup(mca_common_monitoring_initial_filename); } /* PML PVARs */ (void)mca_base_pvar_register("ompi", "pml", "monitoring", "flush", "Flush the monitoring " "information in the provided file. The filename is append with " "the .%d.prof suffix, where %d is replaced with the processus " "rank in MPI_COMM_WORLD.", OPAL_INFO_LVL_1, MCA_BASE_PVAR_CLASS_GENERIC, MCA_BASE_VAR_TYPE_STRING, NULL, MPI_T_BIND_NO_OBJECT, MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_get_flush, mca_common_monitoring_set_flush, mca_common_monitoring_notify_flush, NULL); (void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_count", "Number of " "messages sent to each peer through the PML framework.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_get_pml_count, NULL, mca_common_monitoring_comm_size_notify, NULL); (void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_size", "Size of messages " "sent to each peer in a communicator through the PML framework.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_get_pml_size, NULL, mca_common_monitoring_comm_size_notify, NULL); /* OSC PVARs */ (void)mca_base_pvar_register("ompi", "osc", "monitoring", "messages_sent_count", "Number of " "messages sent through the OSC framework with each peer.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_get_osc_sent_count, NULL, mca_common_monitoring_comm_size_notify, NULL); (void)mca_base_pvar_register("ompi", "osc", "monitoring", "messages_sent_size", "Size of " "messages sent through the OSC framework with each peer.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_get_osc_sent_size, NULL, mca_common_monitoring_comm_size_notify, NULL); (void)mca_base_pvar_register("ompi", "osc", "monitoring", "messages_recv_count", "Number of " "messages received through the OSC framework with each peer.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_get_osc_recv_count, NULL, mca_common_monitoring_comm_size_notify, NULL); (void)mca_base_pvar_register("ompi", "osc", "monitoring", "messages_recv_size", "Size of " "messages received through the OSC framework with each peer.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_get_osc_recv_size, NULL, mca_common_monitoring_comm_size_notify, NULL); /* COLL PVARs */ (void)mca_base_pvar_register("ompi", "coll", "monitoring", "messages_count", "Number of " "messages exchanged through the COLL framework with each peer.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_get_coll_count, NULL, mca_common_monitoring_comm_size_notify, NULL); (void)mca_base_pvar_register("ompi", "coll", "monitoring", "messages_size", "Size of " "messages exchanged through the COLL framework with each peer.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_get_coll_size, NULL, mca_common_monitoring_comm_size_notify, NULL); (void)mca_base_pvar_register("ompi", "coll", "monitoring", "o2a_count", "Number of messages " "exchanged as one-to-all operations in a communicator.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_COUNTER, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_coll_get_o2a_count, NULL, mca_common_monitoring_coll_messages_notify, NULL); (void)mca_base_pvar_register("ompi", "coll", "monitoring", "o2a_size", "Size of messages " "exchanged as one-to-all operations in a communicator.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_AGGREGATE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_coll_get_o2a_size, NULL, mca_common_monitoring_coll_messages_notify, NULL); (void)mca_base_pvar_register("ompi", "coll", "monitoring", "a2o_count", "Number of messages " "exchanged as all-to-one operations in a communicator.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_COUNTER, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_coll_get_a2o_count, NULL, mca_common_monitoring_coll_messages_notify, NULL); (void)mca_base_pvar_register("ompi", "coll", "monitoring", "a2o_size", "Size of messages " "exchanged as all-to-one operations in a communicator.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_AGGREGATE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_coll_get_a2o_size, NULL, mca_common_monitoring_coll_messages_notify, NULL); (void)mca_base_pvar_register("ompi", "coll", "monitoring", "a2a_count", "Number of messages " "exchanged as all-to-all operations in a communicator.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_COUNTER, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_coll_get_a2a_count, NULL, mca_common_monitoring_coll_messages_notify, NULL); (void)mca_base_pvar_register("ompi", "coll", "monitoring", "a2a_size", "Size of messages " "exchanged as all-to-all operations in a communicator.", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_AGGREGATE, MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM, MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG, mca_common_monitoring_coll_get_a2a_size, NULL, mca_common_monitoring_coll_messages_notify, NULL); return OMPI_SUCCESS; } /** * This PML monitors only the processes in the MPI_COMM_WORLD. As OMPI is now lazily * adding peers on the first call to add_procs we need to check how many processes * are in the MPI_COMM_WORLD to create the storage with the right size. */ int mca_common_monitoring_add_procs(struct ompi_proc_t **procs, size_t nprocs) { opal_process_name_t tmp, wp_name; size_t i; int peer_rank; uint64_t key; if( 0 > rank_world ) rank_world = ompi_comm_rank((ompi_communicator_t*)&ompi_mpi_comm_world); if( !nprocs_world ) nprocs_world = ompi_comm_size((ompi_communicator_t*)&ompi_mpi_comm_world); if( NULL == pml_data ) { int array_size = (10 + max_size_histogram) * nprocs_world; pml_data = (opal_atomic_size_t*)calloc(array_size, sizeof(size_t)); pml_count = pml_data + nprocs_world; filtered_pml_data = pml_count + nprocs_world; filtered_pml_count = filtered_pml_data + nprocs_world; osc_data_s = filtered_pml_count + nprocs_world; osc_count_s = osc_data_s + nprocs_world; osc_data_r = osc_count_s + nprocs_world; osc_count_r = osc_data_r + nprocs_world; coll_data = osc_count_r + nprocs_world; coll_count = coll_data + nprocs_world; size_histogram = coll_count + nprocs_world; } /* For all procs in the same MPI_COMM_WORLD we need to add them to the hash table */ for( i = 0; i < nprocs; i++ ) { /* Extract the peer procname from the procs array */ if( ompi_proc_is_sentinel(procs[i]) ) { tmp = ompi_proc_sentinel_to_name((uintptr_t)procs[i]); } else { tmp = procs[i]->super.proc_name; } if( tmp.jobid != ompi_proc_local_proc->super.proc_name.jobid ) continue; /* each process will only be added once, so there is no way it already exists in the hash */ for( peer_rank = 0; peer_rank < nprocs_world; peer_rank++ ) { wp_name = ompi_group_get_proc_name(((ompi_communicator_t*)&ompi_mpi_comm_world)->c_remote_group, peer_rank); if( 0 != opal_compare_proc( tmp, wp_name ) ) continue; key = *((uint64_t*)&tmp); /* save the rank of the process in MPI_COMM_WORLD in the hash using the proc_name as the key */ if( OPAL_SUCCESS != opal_hash_table_set_value_uint64(ompi_common_monitoring_translation_ht, key, (void*)(uintptr_t)peer_rank) ) { return OMPI_ERR_OUT_OF_RESOURCE; /* failed to allocate memory or growing the hash table */ } break; } } return OMPI_SUCCESS; } static void mca_common_monitoring_reset( void ) { int array_size = (10 + max_size_histogram) * nprocs_world; memset((void *) pml_data, 0, array_size * sizeof(size_t)); mca_common_monitoring_coll_reset(); } void mca_common_monitoring_record_pml(int world_rank, size_t data_size, int tag) { if( 0 == mca_common_monitoring_current_state ) return; /* right now the monitoring is not started */ /* Keep tracks of the data_size distribution */ if( 0 == data_size ) { opal_atomic_add_fetch_size_t(&size_histogram[world_rank * max_size_histogram], 1); } else { int log2_size = log10(data_size)/log10_2; if(log2_size > max_size_histogram - 2) /* Avoid out-of-bound write */ log2_size = max_size_histogram - 2; opal_atomic_add_fetch_size_t(&size_histogram[world_rank * max_size_histogram + log2_size + 1], 1); } /* distinguishses positive and negative tags if requested */ if( (tag < 0) && (mca_common_monitoring_filter()) ) { opal_atomic_add_fetch_size_t(&filtered_pml_data[world_rank], data_size); opal_atomic_add_fetch_size_t(&filtered_pml_count[world_rank], 1); } else { /* if filtered monitoring is not activated data is aggregated indifferently */ opal_atomic_add_fetch_size_t(&pml_data[world_rank], data_size); opal_atomic_add_fetch_size_t(&pml_count[world_rank], 1); } } static int mca_common_monitoring_get_pml_count(const struct mca_base_pvar_t *pvar, void *value, void *obj_handle) { ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle; int i, comm_size = ompi_comm_size (comm); size_t *values = (size_t*) value; if(comm != &ompi_mpi_comm_world.comm || NULL == pml_count) return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { values[i] = pml_count[i]; } return OMPI_SUCCESS; } static int mca_common_monitoring_get_pml_size(const struct mca_base_pvar_t *pvar, void *value, void *obj_handle) { ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle; int comm_size = ompi_comm_size (comm); size_t *values = (size_t*) value; int i; if(comm != &ompi_mpi_comm_world.comm || NULL == pml_data) return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { values[i] = pml_data[i]; } return OMPI_SUCCESS; } void mca_common_monitoring_record_osc(int world_rank, size_t data_size, enum mca_monitoring_osc_direction dir) { if( 0 == mca_common_monitoring_current_state ) return; /* right now the monitoring is not started */ if( SEND == dir ) { opal_atomic_add_fetch_size_t(&osc_data_s[world_rank], data_size); opal_atomic_add_fetch_size_t(&osc_count_s[world_rank], 1); } else { opal_atomic_add_fetch_size_t(&osc_data_r[world_rank], data_size); opal_atomic_add_fetch_size_t(&osc_count_r[world_rank], 1); } } static int mca_common_monitoring_get_osc_sent_count(const struct mca_base_pvar_t *pvar, void *value, void *obj_handle) { ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle; int i, comm_size = ompi_comm_size (comm); size_t *values = (size_t*) value; if(comm != &ompi_mpi_comm_world.comm || NULL == pml_count) return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { values[i] = osc_count_s[i]; } return OMPI_SUCCESS; } static int mca_common_monitoring_get_osc_sent_size(const struct mca_base_pvar_t *pvar, void *value, void *obj_handle) { ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle; int comm_size = ompi_comm_size (comm); size_t *values = (size_t*) value; int i; if(comm != &ompi_mpi_comm_world.comm || NULL == pml_data) return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { values[i] = osc_data_s[i]; } return OMPI_SUCCESS; } static int mca_common_monitoring_get_osc_recv_count(const struct mca_base_pvar_t *pvar, void *value, void *obj_handle) { ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle; int i, comm_size = ompi_comm_size (comm); size_t *values = (size_t*) value; if(comm != &ompi_mpi_comm_world.comm || NULL == pml_count) return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { values[i] = osc_count_r[i]; } return OMPI_SUCCESS; } static int mca_common_monitoring_get_osc_recv_size(const struct mca_base_pvar_t *pvar, void *value, void *obj_handle) { ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle; int comm_size = ompi_comm_size (comm); size_t *values = (size_t*) value; int i; if(comm != &ompi_mpi_comm_world.comm || NULL == pml_data) return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { values[i] = osc_data_r[i]; } return OMPI_SUCCESS; } void mca_common_monitoring_record_coll(int world_rank, size_t data_size) { if( 0 == mca_common_monitoring_current_state ) return; /* right now the monitoring is not started */ opal_atomic_add_fetch_size_t(&coll_data[world_rank], data_size); opal_atomic_add_fetch_size_t(&coll_count[world_rank], 1); } static int mca_common_monitoring_get_coll_count(const struct mca_base_pvar_t *pvar, void *value, void *obj_handle) { ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle; int i, comm_size = ompi_comm_size (comm); size_t *values = (size_t*) value; if(comm != &ompi_mpi_comm_world.comm || NULL == pml_count) return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { values[i] = coll_count[i]; } return OMPI_SUCCESS; } static int mca_common_monitoring_get_coll_size(const struct mca_base_pvar_t *pvar, void *value, void *obj_handle) { ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle; int comm_size = ompi_comm_size (comm); size_t *values = (size_t*) value; int i; if(comm != &ompi_mpi_comm_world.comm || NULL == pml_data) return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { values[i] = coll_data[i]; } return OMPI_SUCCESS; } static void mca_common_monitoring_output( FILE *pf, int my_rank, int nbprocs ) { /* Dump outgoing messages */ fprintf(pf, "# POINT TO POINT\n"); for (int i = 0 ; i < nbprocs ; i++) { if(pml_count[i] > 0) { fprintf(pf, "E\t%" PRId32 "\t%" PRId32 "\t%zu bytes\t%zu msgs sent\t", my_rank, i, pml_data[i], pml_count[i]); for(int j = 0 ; j < max_size_histogram ; ++j) fprintf(pf, "%zu%s", size_histogram[i * max_size_histogram + j], j < max_size_histogram - 1 ? "," : "\n"); } } /* Dump outgoing synchronization/collective messages */ if( mca_common_monitoring_filter() ) { for (int i = 0 ; i < nbprocs ; i++) { if(filtered_pml_count[i] > 0) { fprintf(pf, "I\t%" PRId32 "\t%" PRId32 "\t%zu bytes\t%zu msgs sent%s", my_rank, i, filtered_pml_data[i], filtered_pml_count[i], 0 == pml_count[i] ? "\t" : "\n"); /* * In the case there was no external messages * exchanged between the two processes, the histogram * has not yet been dumpped. Then we need to add it at * the end of the internal category. */ if(0 == pml_count[i]) { for(int j = 0 ; j < max_size_histogram ; ++j) fprintf(pf, "%zu%s", size_histogram[i * max_size_histogram + j], j < max_size_histogram - 1 ? "," : "\n"); } } } } /* Dump incoming messages */ fprintf(pf, "# OSC\n"); for (int i = 0 ; i < nbprocs ; i++) { if(osc_count_s[i] > 0) { fprintf(pf, "S\t%" PRId32 "\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n", my_rank, i, osc_data_s[i], osc_count_s[i]); } if(osc_count_r[i] > 0) { fprintf(pf, "R\t%" PRId32 "\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n", my_rank, i, osc_data_r[i], osc_count_r[i]); } } /* Dump collectives */ fprintf(pf, "# COLLECTIVES\n"); for (int i = 0 ; i < nbprocs ; i++) { if(coll_count[i] > 0) { fprintf(pf, "C\t%" PRId32 "\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n", my_rank, i, coll_data[i], coll_count[i]); } } mca_common_monitoring_coll_flush_all(pf); } /* * Flushes the monitoring into filename * Useful for phases (see example in test/monitoring) */ static int mca_common_monitoring_flush(int fd, char* filename) { /* If we are not drived by MPIT then dump the monitoring information */ if( 0 == mca_common_monitoring_current_state || 0 == fd ) /* if disabled do nothing */ return OMPI_SUCCESS; if( 1 == fd ) { OPAL_MONITORING_PRINT_INFO("Proc %" PRId32 " flushing monitoring to stdout", rank_world); mca_common_monitoring_output( stdout, rank_world, nprocs_world ); } else if( 2 == fd ) { OPAL_MONITORING_PRINT_INFO("Proc %" PRId32 " flushing monitoring to stderr", rank_world); mca_common_monitoring_output( stderr, rank_world, nprocs_world ); } else { FILE *pf = NULL; char* tmpfn = NULL; if( NULL == filename ) { /* No filename */ OPAL_MONITORING_PRINT_ERR("Error while flushing: no filename provided"); return OMPI_ERROR; } else { opal_asprintf(&tmpfn, "%s.%" PRId32 ".prof", filename, rank_world); pf = fopen(tmpfn, "w"); free(tmpfn); } if(NULL == pf) { /* Error during open */ OPAL_MONITORING_PRINT_ERR("Error while flushing to: %s.%" PRId32 ".prof", filename, rank_world); return OMPI_ERROR; } OPAL_MONITORING_PRINT_INFO("Proc %d flushing monitoring to: %s.%" PRId32 ".prof", rank_world, filename, rank_world); mca_common_monitoring_output( pf, rank_world, nprocs_world ); fclose(pf); } /* Reset to 0 all monitored data */ mca_common_monitoring_reset(); return OMPI_SUCCESS; }