/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2020 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2014 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2015-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2015-2016 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2020 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * * In windows, many of the socket functions return an EWOULDBLOCK * instead of \ things like EAGAIN, EINPROGRESS, etc. It has been * verified that this will \ not conflict with other error codes that * are returned by these functions \ under UNIX/Linux environments */ #include "opal_config.h" #ifdef HAVE_SYS_TYPES_H # include #endif #ifdef HAVE_SYS_UIO_H # include #endif #ifdef HAVE_NET_UIO_H # include #endif #ifdef HAVE_UNISTD_H # include #endif /* HAVE_UNISTD_H */ #include "opal/mca/btl/base/btl_base_error.h" #include "opal/opal_socket_errno.h" #include "opal/util/proc.h" #include "opal/util/show_help.h" #include "btl_tcp_endpoint.h" #include "btl_tcp_frag.h" #include "btl_tcp_proc.h" static void mca_btl_tcp_frag_eager_constructor(mca_btl_tcp_frag_t *frag) { frag->size = mca_btl_tcp_module.super.btl_eager_limit; frag->my_list = &mca_btl_tcp_component.tcp_frag_eager; } static void mca_btl_tcp_frag_max_constructor(mca_btl_tcp_frag_t *frag) { frag->size = mca_btl_tcp_module.super.btl_max_send_size; frag->my_list = &mca_btl_tcp_component.tcp_frag_max; } static void mca_btl_tcp_frag_user_constructor(mca_btl_tcp_frag_t *frag) { frag->size = 0; frag->my_list = &mca_btl_tcp_component.tcp_frag_user; } OBJ_CLASS_INSTANCE(mca_btl_tcp_frag_t, mca_btl_base_descriptor_t, NULL, NULL); OBJ_CLASS_INSTANCE(mca_btl_tcp_frag_eager_t, mca_btl_base_descriptor_t, mca_btl_tcp_frag_eager_constructor, NULL); OBJ_CLASS_INSTANCE(mca_btl_tcp_frag_max_t, mca_btl_base_descriptor_t, mca_btl_tcp_frag_max_constructor, NULL); OBJ_CLASS_INSTANCE(mca_btl_tcp_frag_user_t, mca_btl_base_descriptor_t, mca_btl_tcp_frag_user_constructor, NULL); size_t mca_btl_tcp_frag_dump(mca_btl_tcp_frag_t *frag, char *msg, char *buf, size_t length) { int i, used; used = snprintf(buf, length, "%s frag %p iov_cnt %d iov_idx %d size %lu\n", msg, (void *) frag, (int) frag->iov_cnt, (int) frag->iov_idx, frag->size); if ((size_t) used >= length) { return length; } for (i = 0; i < (int) frag->iov_cnt; i++) { used += snprintf(&buf[used], length - used, "[%s%p:%lu] ", (i < (int) frag->iov_idx ? "*" : ""), frag->iov[i].iov_base, frag->iov[i].iov_len); if ((size_t) used >= length) { return length; } } return used; } bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t *frag, int sd) { ssize_t cnt; size_t i, num_vecs; /* non-blocking write, but continue if interrupted */ do { cnt = writev(sd, frag->iov_ptr, frag->iov_cnt); if (cnt < 0) { switch (opal_socket_errno) { case EINTR: continue; case EWOULDBLOCK: return false; case EFAULT: BTL_ERROR(("mca_btl_tcp_frag_send: writev error (%p, %lu)\n\t%s(%lu)\n", frag->iov_ptr[0].iov_base, (unsigned long) frag->iov_ptr[0].iov_len, strerror(opal_socket_errno), (unsigned long) frag->iov_cnt)); /* send_lock held by caller */ frag->endpoint->endpoint_state = MCA_BTL_TCP_FAILED; mca_btl_tcp_endpoint_close(frag->endpoint); return false; default: BTL_PEER_ERROR(frag->endpoint->endpoint_proc->proc_opal, ("mca_btl_tcp_frag_send: writev failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno)); /* send_lock held by caller */ frag->endpoint->endpoint_state = MCA_BTL_TCP_FAILED; mca_btl_tcp_endpoint_close(frag->endpoint); return false; } } } while (cnt < 0); /* if the write didn't complete - update the iovec state */ num_vecs = frag->iov_cnt; for (i = 0; i < num_vecs; i++) { if (cnt >= (ssize_t) frag->iov_ptr->iov_len) { cnt -= frag->iov_ptr->iov_len; frag->iov_ptr++; frag->iov_idx++; frag->iov_cnt--; } else { frag->iov_ptr->iov_base = (opal_iov_base_ptr_t)( ((unsigned char *) frag->iov_ptr->iov_base) + cnt); frag->iov_ptr->iov_len -= cnt; OPAL_OUTPUT_VERBOSE((100, opal_btl_base_framework.framework_output, "%s:%d write %ld bytes on socket %d\n", __FILE__, __LINE__, cnt, sd)); break; } } return (frag->iov_cnt == 0); } bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t *frag, int sd) { mca_btl_base_endpoint_t *btl_endpoint = frag->endpoint; ssize_t cnt; int32_t i, num_vecs, dont_copy_data = 0; char *errhost; repeat: num_vecs = frag->iov_cnt; #if MCA_BTL_TCP_ENDPOINT_CACHE if (0 != btl_endpoint->endpoint_cache_length) { size_t length; /* It's strange at the first look but cnt have to be set to the full amount of data * available. After going to advance_iov_position we will use cnt to detect if there * is still some data pending. */ cnt = length = btl_endpoint->endpoint_cache_length; for (i = 0; i < (int) frag->iov_cnt; i++) { if (length > frag->iov_ptr[i].iov_len) { length = frag->iov_ptr[i].iov_len; } if ((0 == dont_copy_data) || (length < frag->iov_ptr[i].iov_len)) { memcpy(frag->iov_ptr[i].iov_base, btl_endpoint->endpoint_cache_pos, length); } else { frag->segments[0].seg_addr.pval = btl_endpoint->endpoint_cache_pos; frag->iov_ptr[i].iov_base = btl_endpoint->endpoint_cache_pos; } btl_endpoint->endpoint_cache_pos += length; btl_endpoint->endpoint_cache_length -= length; length = btl_endpoint->endpoint_cache_length; if (0 == length) { btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache; break; } } goto advance_iov_position; } /* What's happens if all iovecs are used by the fragment ? It still work, as we reserve one * iovec for the caching in the fragment structure (the +1). */ frag->iov_ptr[num_vecs].iov_base = btl_endpoint->endpoint_cache_pos; frag->iov_ptr[num_vecs].iov_len = mca_btl_tcp_component.tcp_endpoint_cache - btl_endpoint->endpoint_cache_length; num_vecs++; #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ /* non-blocking read, but continue if interrupted */ do { cnt = readv(sd, frag->iov_ptr, num_vecs); if (0 < cnt) { goto advance_iov_position; } if (cnt == 0) { OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); if (MCA_BTL_TCP_CONNECTED == btl_endpoint->endpoint_state) { btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED; } mca_btl_tcp_endpoint_close(btl_endpoint); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); return false; } switch (opal_socket_errno) { case EINTR: continue; case EWOULDBLOCK: return false; case EFAULT: BTL_ERROR(("mca_btl_tcp_frag_recv: readv error (%p, %lu)\n\t%s(%lu)\n", frag->iov_ptr[0].iov_base, (unsigned long) frag->iov_ptr[0].iov_len, strerror(opal_socket_errno), (unsigned long) frag->iov_cnt)); break; case ECONNRESET: if (mca_btl_base_warn_peer_error || mca_btl_base_verbose > 0) { errhost = opal_get_proc_hostname(btl_endpoint->endpoint_proc->proc_opal); opal_show_help("help-mpi-btl-tcp.txt", "peer hung up", true, opal_process_info.nodename, getpid(), errhost); free(errhost); } break; default: BTL_PEER_ERROR(frag->endpoint->endpoint_proc->proc_opal, ("mca_btl_tcp_frag_recv: readv failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno)); break; } OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED; mca_btl_tcp_endpoint_close(btl_endpoint); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); return false; } while (cnt < 0); advance_iov_position: /* if the read didn't complete - update the iovec state */ num_vecs = frag->iov_cnt; for (i = 0; i < num_vecs; i++) { if (cnt < (ssize_t) frag->iov_ptr->iov_len) { frag->iov_ptr->iov_base = (opal_iov_base_ptr_t)( ((unsigned char *) frag->iov_ptr->iov_base) + cnt); frag->iov_ptr->iov_len -= cnt; cnt = 0; break; } cnt -= frag->iov_ptr->iov_len; frag->iov_idx++; frag->iov_ptr++; frag->iov_cnt--; } #if MCA_BTL_TCP_ENDPOINT_CACHE btl_endpoint->endpoint_cache_length = cnt; #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ /* read header */ if (frag->iov_cnt == 0) { if (btl_endpoint->endpoint_nbo && frag->iov_idx == 1) { MCA_BTL_TCP_HDR_NTOH(frag->hdr); } switch (frag->hdr.type) { case MCA_BTL_TCP_HDR_TYPE_FIN: frag->endpoint->endpoint_state = MCA_BTL_TCP_CLOSED; mca_btl_tcp_endpoint_close(frag->endpoint); break; case MCA_BTL_TCP_HDR_TYPE_SEND: if (frag->iov_idx == 1 && frag->hdr.size) { frag->segments[0].seg_addr.pval = frag + 1; frag->segments[0].seg_len = frag->hdr.size; frag->iov[1].iov_base = (IOVBASE_TYPE *) (frag->segments[0].seg_addr.pval); frag->iov[1].iov_len = frag->hdr.size; frag->iov_cnt++; goto repeat; } break; case MCA_BTL_TCP_HDR_TYPE_PUT: if (frag->iov_idx == 1) { frag->iov[1].iov_base = (IOVBASE_TYPE *) frag->segments; frag->iov[1].iov_len = frag->hdr.count * sizeof(mca_btl_base_segment_t); frag->iov_cnt++; goto repeat; } else if (frag->iov_idx == 2) { for (i = 0; i < frag->hdr.count; i++) { if (btl_endpoint->endpoint_nbo) { MCA_BTL_BASE_SEGMENT_NTOH(frag->segments[i]); } frag->iov[i + 2].iov_base = (IOVBASE_TYPE *) frag->segments[i].seg_addr.pval; frag->iov[i + 2].iov_len = frag->segments[i].seg_len; } frag->iov_cnt += frag->hdr.count; goto repeat; } break; case MCA_BTL_TCP_HDR_TYPE_GET: default: break; } return true; } return false; }