From 3a7516f3fa7cbde40fe32a845f72767b55e1e764 Mon Sep 17 00:00:00 2001 From: Howard Pritchard Date: Mon, 14 Jul 2025 11:00:02 -0600 Subject: [PATCH 1/4] support for mpi_comm_attach_buffer and friends Add support for functions added as part of MPI 4.1 standard concerning buffer management for buffered send mode: MPI_Comm_attach_buffer MPI_Comm_detach_buffer MPI_Session_attach_buffer MPI_Session_detach_buffer MPI_Comm_buffer_flush MPI_Comm_buffer_iflush MPI_Session_buffer_flush MPI_Session_buffer_iflush Full support for non-blocking flush is deferred to a subsequent PR to avoid reviewer overload. Related to #12074 Signed-off-by: Howard Pritchard --- ompi/communicator/communicator.h | 17 +- ompi/include/mpi.h.in | 29 ++ ompi/instance/instance.c | 2 +- ompi/instance/instance.h | 18 +- ompi/mca/pml/base/pml_base_bsend.c | 561 +++++++++++++++++++++----- ompi/mca/pml/base/pml_base_bsend.h | 39 +- ompi/mca/pml/cm/pml_cm_sendreq.h | 7 +- ompi/mca/pml/ucx/pml_ucx.c | 28 +- ompi/mca/pml/ucx/pml_ucx_request.c | 4 +- ompi/mpi/c/Makefile.am | 10 + ompi/mpi/c/buffer_attach.c.in | 2 +- ompi/mpi/c/buffer_detach.c.in | 6 +- ompi/mpi/c/buffer_flush.c.in | 44 ++ ompi/mpi/c/buffer_iflush.c.in | 49 +++ ompi/mpi/c/comm_attach_buffer.c.in | 49 +++ ompi/mpi/c/comm_buffer_attach.c.in | 53 +++ ompi/mpi/c/comm_detach_buffer.c.in | 57 +++ ompi/mpi/c/comm_flush_buffer.c.in | 46 +++ ompi/mpi/c/comm_iflush_buffer.c.in | 50 +++ ompi/mpi/c/session_attach_buffer.c.in | 53 +++ ompi/mpi/c/session_detach_buffer.c.in | 61 +++ ompi/mpi/c/session_flush_buffer.c.in | 50 +++ ompi/mpi/c/session_iflush_buffer.c.in | 54 +++ 23 files changed, 1157 insertions(+), 132 deletions(-) create mode 100644 ompi/mpi/c/buffer_flush.c.in create mode 100644 ompi/mpi/c/buffer_iflush.c.in create mode 100644 ompi/mpi/c/comm_attach_buffer.c.in create mode 100644 ompi/mpi/c/comm_buffer_attach.c.in create mode 100644 ompi/mpi/c/comm_detach_buffer.c.in create mode 100644 ompi/mpi/c/comm_flush_buffer.c.in create mode 100644 ompi/mpi/c/comm_iflush_buffer.c.in create mode 100644 ompi/mpi/c/session_attach_buffer.c.in create mode 100644 ompi/mpi/c/session_detach_buffer.c.in create mode 100644 ompi/mpi/c/session_flush_buffer.c.in create mode 100644 ompi/mpi/c/session_iflush_buffer.c.in diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 4a1b42ab0c4..c18b47eaf70 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -22,7 +22,7 @@ * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016-2017 IBM Corporation. All rights reserved. - * Copyright (c) 2018-2024 Triad National Security, LLC. All rights + * Copyright (c) 2018-2025 Triad National Security, LLC. All rights * reserved. * Copyright (c) 2023 Advanced Micro Devices, Inc. All rights reserved. * Copyright (c) 2024 NVIDIA Corporation. All rights reserved. @@ -345,6 +345,9 @@ struct ompi_communicator_t { /* instance that this comm belongs to */ ompi_instance_t* instance; + /* pointer to buffer object used for buffered sends */ + void *bsend_buffer; + #if OPAL_ENABLE_FT_MPI /** agreement caching info for topology and previous returned decisions */ opal_object_t *agreement_specific; @@ -775,6 +778,18 @@ static inline bool ompi_comm_iface_create_check(ompi_communicator_t *comm, int * return ompi_comm_iface_coll_check(comm, err); } +static inline void *ompi_comm_bsend_buffer_get(ompi_communicator_t *comm) +{ + assert(NULL != comm); + return comm->bsend_buffer; +} + +static inline int ompi_comm_bsend_buffer_set(ompi_communicator_t *comm, void *buffer) +{ + comm->bsend_buffer = buffer; + return OMPI_SUCCESS; +} + /* * Communicator creation support collectives * - Agreement style allreduce diff --git a/ompi/include/mpi.h.in b/ompi/include/mpi.h.in index e838fe66061..1fbcba8ec75 100644 --- a/ompi/include/mpi.h.in +++ b/ompi/include/mpi.h.in @@ -564,6 +564,7 @@ typedef MPI_Win_errhandler_function MPI_Win_errhandler_fn #define MPI_WEIGHTS_EMPTY ((int *) 3) /* empty weights */ #define MPI_BOTTOM ((void *) 0) /* base reference address */ #define MPI_IN_PLACE ((void *) 1) /* in place buffer */ +#define MPI_BUFFER_AUTOMATIC ((void *) 4) /* MPI_BUFFER_AUTOMATIC for buffer attach funcs */ #define MPI_BSEND_OVERHEAD 128 /* size of bsend header + ptr */ #define MPI_MAX_INFO_KEY OPAL_MAX_INFO_KEY /* max info key length */ #define MPI_MAX_INFO_VAL OPAL_MAX_INFO_VAL /* max info value length */ @@ -1574,6 +1575,8 @@ OMPI_DECLSPEC int MPI_Buffer_attach(void *buffer, int size); OMPI_DECLSPEC int MPI_Buffer_attach_c(void *buffer, MPI_Count size); OMPI_DECLSPEC int MPI_Buffer_detach(void *buffer, int *size); OMPI_DECLSPEC int MPI_Buffer_detach_c(void *buffer, MPI_Count *size); +OMPI_DECLSPEC int MPI_Buffer_flush(void); +OMPI_DECLSPEC int MPI_Buffer_iflush(MPI_Request *request); OMPI_DECLSPEC int MPI_Cancel(MPI_Request *request); OMPI_DECLSPEC int MPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int coords[]); OMPI_DECLSPEC int MPI_Cart_create(MPI_Comm old_comm, int ndims, const int dims[], @@ -1637,11 +1640,17 @@ OMPI_DECLSPEC int MPI_Dist_graph_neighbors_count(MPI_Comm comm, int *inneighbors, int *outneighbors, int *weighted); +OMPI_DECLSPEC int MPI_Comm_attach_buffer(MPI_Comm comm, void *buffer, int size); +OMPI_DECLSPEC int MPI_Comm_attach_buffer_c(MPI_Comm comm, void *buffer, MPI_Count size); +OMPI_DECLSPEC int MPI_Comm_detach_buffer(MPI_Comm comm, void *buffer_addr, int *size); +OMPI_DECLSPEC int MPI_Comm_detach_buffer_c(MPI_Comm comm, void *buffer_addr, MPI_Count *size); +OMPI_DECLSPEC int MPI_Comm_flush_buffer(MPI_Comm comm); OMPI_DECLSPEC int MPI_Comm_get_errhandler(MPI_Comm comm, MPI_Errhandler *erhandler); OMPI_DECLSPEC int MPI_Comm_get_info(MPI_Comm comm, MPI_Info *info_used); OMPI_DECLSPEC int MPI_Comm_get_name(MPI_Comm comm, char *comm_name, int *resultlen); OMPI_DECLSPEC int MPI_Comm_get_parent(MPI_Comm *parent); OMPI_DECLSPEC int MPI_Comm_group(MPI_Comm comm, MPI_Group *group); +OMPI_DECLSPEC int MPI_Comm_iflush_buffer(MPI_Comm comm, MPI_Request *request); OMPI_DECLSPEC int MPI_Comm_join(int fd, MPI_Comm *intercomm); OMPI_DECLSPEC int MPI_Comm_rank(MPI_Comm comm, int *rank); OMPI_DECLSPEC int MPI_Comm_remote_group(MPI_Comm comm, MPI_Group *group); @@ -2331,15 +2340,21 @@ OMPI_DECLSPEC int MPI_Sendrecv_replace_c(void * buf, MPI_Count count, MPI_Datat int dest, int sendtag, int source, int recvtag, MPI_Comm comm, MPI_Status *status); OMPI_DECLSPEC MPI_Fint MPI_Session_c2f (const MPI_Session session); +OMPI_DECLSPEC int MPI_Session_attach_buffer(MPI_Session session, void *buffer, int size); +OMPI_DECLSPEC int MPI_Session_attach_buffer_c(MPI_Session session, void *buffer, MPI_Count size); OMPI_DECLSPEC int MPI_Session_call_errhandler(MPI_Session session, int errorcode); OMPI_DECLSPEC int MPI_Session_create_errhandler (MPI_Session_errhandler_function *session_errhandler_fn, MPI_Errhandler *errhandler); +OMPI_DECLSPEC int MPI_Session_detach_buffer(MPI_Session session, void *buffer_addr, int *size); +OMPI_DECLSPEC int MPI_Session_detach_buffer_c(MPI_Session session, void *buffer_addr, MPI_Count *size); OMPI_DECLSPEC int MPI_Session_finalize (MPI_Session *session); +OMPI_DECLSPEC int MPI_Session_flush_buffer(MPI_Session session); OMPI_DECLSPEC int MPI_Session_get_errhandler(MPI_Session session, MPI_Errhandler *erhandler); OMPI_DECLSPEC int MPI_Session_get_info (MPI_Session session, MPI_Info *info_used); OMPI_DECLSPEC int MPI_Session_get_num_psets (MPI_Session session, MPI_Info info, int *npset_names); OMPI_DECLSPEC int MPI_Session_get_nth_pset (MPI_Session session, MPI_Info info, int n, int *len, char *pset_name); OMPI_DECLSPEC int MPI_Session_get_pset_info (MPI_Session session, const char *pset_name, MPI_Info *info_used); +OMPI_DECLSPEC int MPI_Session_iflush_buffer(MPI_Session session, MPI_Request *request); OMPI_DECLSPEC int MPI_Session_init (MPI_Info info, MPI_Errhandler errhandler, MPI_Session *session); OMPI_DECLSPEC MPI_Session MPI_Session_f2c (MPI_Fint session); @@ -2733,6 +2748,8 @@ OMPI_DECLSPEC int PMPI_Buffer_attach(void *buffer, int size); OMPI_DECLSPEC int PMPI_Buffer_attach_c(void *buffer, MPI_Count size); OMPI_DECLSPEC int PMPI_Buffer_detach(void *buffer, int *size); OMPI_DECLSPEC int PMPI_Buffer_detach_c(void *buffer, MPI_Count *size); +OMPI_DECLSPEC int PMPI_Buffer_flush(void); +OMPI_DECLSPEC int PMPI_Buffer_iflush(MPI_Request *request); OMPI_DECLSPEC int PMPI_Cancel(MPI_Request *request); OMPI_DECLSPEC int PMPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int coords[]); OMPI_DECLSPEC int PMPI_Cart_create(MPI_Comm old_comm, int ndims, const int dims[], @@ -2796,11 +2813,17 @@ OMPI_DECLSPEC int PMPI_Dist_graph_neighbors_count(MPI_Comm comm, int *inneighbors, int *outneighbors, int *weighted); +OMPI_DECLSPEC int PMPI_Comm_attach_buffer(MPI_Comm comm, void *buffer, int size); +OMPI_DECLSPEC int PMPI_Comm_attach_buffer_c(MPI_Comm comm, void *buffer, MPI_Count size); +OMPI_DECLSPEC int PMPI_Comm_detach_buffer(MPI_Comm comm, void *buffer_addr, int *size); +OMPI_DECLSPEC int PMPI_Comm_detach_buffer_c(MPI_Comm comm, void *buffer_addr, MPI_Count *size); +OMPI_DECLSPEC int PMPI_Comm_flush_buffer(MPI_Comm comm); OMPI_DECLSPEC int PMPI_Comm_get_errhandler(MPI_Comm comm, MPI_Errhandler *erhandler); OMPI_DECLSPEC int PMPI_Comm_get_info(MPI_Comm comm, MPI_Info *info_used); OMPI_DECLSPEC int PMPI_Comm_get_name(MPI_Comm comm, char *comm_name, int *resultlen); OMPI_DECLSPEC int PMPI_Comm_get_parent(MPI_Comm *parent); OMPI_DECLSPEC int PMPI_Comm_group(MPI_Comm comm, MPI_Group *group); +OMPI_DECLSPEC int PMPI_Comm_iflush_buffer(MPI_Comm comm, MPI_Request *request); OMPI_DECLSPEC int PMPI_Comm_join(int fd, MPI_Comm *intercomm); OMPI_DECLSPEC int PMPI_Comm_rank(MPI_Comm comm, int *rank); OMPI_DECLSPEC int PMPI_Comm_remote_group(MPI_Comm comm, MPI_Group *group); @@ -3490,15 +3513,21 @@ OMPI_DECLSPEC int PMPI_Sendrecv_replace_c(void * buf, MPI_Count count, MPI_Data int dest, int sendtag, int source, int recvtag, MPI_Comm comm, MPI_Status *status); OMPI_DECLSPEC MPI_Fint PMPI_Session_c2f (const MPI_Session session); +OMPI_DECLSPEC int PMPI_Session_attach_buffer(MPI_Session session, void *buffer, int size); +OMPI_DECLSPEC int PMPI_Session_attach_buffer_c(MPI_Session session, void *buffer, MPI_Count size); OMPI_DECLSPEC int PMPI_Session_call_errhandler(MPI_Session session, int errorcode); OMPI_DECLSPEC int PMPI_Session_create_errhandler (MPI_Session_errhandler_function *session_errhandler_fn, MPI_Errhandler *errhandler); +OMPI_DECLSPEC int PMPI_Session_detach_buffer(MPI_Session session, void *buffer_addr, int *size); +OMPI_DECLSPEC int PMPI_Session_detach_buffer_c(MPI_Session session, void *buffer_addr, MPI_Count *size); OMPI_DECLSPEC int PMPI_Session_finalize (MPI_Session *session); +OMPI_DECLSPEC int PMPI_Session_flush_buffer(MPI_Session session); OMPI_DECLSPEC int PMPI_Session_get_errhandler(MPI_Session session, MPI_Errhandler *erhandler); OMPI_DECLSPEC int PMPI_Session_get_info (MPI_Session session, MPI_Info *info_used); OMPI_DECLSPEC int PMPI_Session_get_num_psets (MPI_Session session, MPI_Info info, int *npset_names); OMPI_DECLSPEC int PMPI_Session_get_nth_pset (MPI_Session session, MPI_Info info, int n, int *len, char *pset_name); OMPI_DECLSPEC int PMPI_Session_get_pset_info (MPI_Session session, const char *pset_name, MPI_Info *info_used); +OMPI_DECLSPEC int PMPI_Session_iflush_buffer(MPI_Session session, MPI_Request *request); OMPI_DECLSPEC int PMPI_Session_init (MPI_Info info, MPI_Errhandler errhandler, MPI_Session *session); OMPI_DECLSPEC MPI_Session PMPI_Session_f2c (MPI_Fint session); diff --git a/ompi/instance/instance.c b/ompi/instance/instance.c index 0b83e442b0c..465782f1a23 100644 --- a/ompi/instance/instance.c +++ b/ompi/instance/instance.c @@ -894,7 +894,7 @@ static int ompi_mpi_instance_finalize_common (void) /* As finalize is the last legal MPI call, we are allowed to force the release * of the user buffer used for bsend, before going anywhere further. */ - (void) mca_pml_base_bsend_detach (NULL, NULL); + (void) mca_pml_base_bsend_detach(BASE_BSEND_BUF, NULL, NULL, NULL); /* Shut down any bindings-specific issues: C++, F77, F90 */ diff --git a/ompi/instance/instance.h b/ompi/instance/instance.h index f72f19525ae..ce5fb25919c 100644 --- a/ompi/instance/instance.h +++ b/ompi/instance/instance.h @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2018 Triad National Security, LLC. All rights reserved. + * Copyright (c) 2018-2025 Triad National Security, LLC. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -41,6 +41,10 @@ struct ompi_instance_t { ompi_errhandler_t *error_handler; ompi_errhandler_type_t errhandler_type; + + /* pointer to buffer object used for buffered sends */ + void *bsend_buffer; + }; typedef struct ompi_instance_t ompi_instance_t; @@ -164,4 +168,16 @@ static inline int ompi_instance_invalid (const ompi_instance_t* instance) return false; } +static inline void *ompi_instance_bsend_buffer_get(ompi_instance_t *instance) +{ + assert(NULL != instance); + return instance->bsend_buffer; +} + +static inline int ompi_instance_bsend_buffer_set(ompi_instance_t *instance, void *buffer) +{ + instance->bsend_buffer = buffer; + return OMPI_SUCCESS; +} + #endif /* !defined(OMPI_INSTANCE_H) */ diff --git a/ompi/mca/pml/base/pml_base_bsend.c b/ompi/mca/pml/base/pml_base_bsend.c index d1c38e90209..e1361cf5360 100644 --- a/ompi/mca/pml/base/pml_base_bsend.c +++ b/ompi/mca/pml/base/pml_base_bsend.c @@ -29,6 +29,7 @@ #include "opal/mca/threads/mutex.h" #include "opal/mca/threads/condition.h" #include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" #include "opal/mca/allocator/base/base.h" #include "opal/mca/allocator/allocator.h" #include "ompi/mca/pml/pml.h" @@ -42,16 +43,39 @@ #include #endif /* HAVE_UNISTD_H */ -static opal_mutex_t mca_pml_bsend_mutex; /* lock for thread safety */ -static opal_condition_t mca_pml_bsend_condition; /* condition variable to block on detach */ -static mca_allocator_base_component_t* mca_pml_bsend_allocator_component; -static mca_allocator_base_module_t* mca_pml_bsend_allocator; /* sub-allocator to manage users buffer */ -static size_t mca_pml_bsend_usersize; /* user provided buffer size */ -unsigned char *mca_pml_bsend_userbase=NULL;/* user provided buffer base */ -unsigned char *mca_pml_bsend_base = NULL;/* adjusted base of user buffer */ -unsigned char *mca_pml_bsend_addr = NULL;/* current offset into user buffer */ -static size_t mca_pml_bsend_size; /* adjusted size of user buffer */ -static size_t mca_pml_bsend_count; /* number of outstanding requests */ +static void mca_pml_bsend_buffer_construct(mca_pml_bsend_buffer_t *buffer); +static void mca_pml_bsend_buffer_construct(mca_pml_bsend_buffer_t *buffer) +{ + fprintf(stderr, "inside buffer construct %p\n", (void *)buffer); + buffer->bsend_allocator = NULL; + buffer->bsend_userbase = NULL; + buffer->bsend_base = NULL; + buffer->bsend_addr = NULL; + buffer->bsend_size = 0UL; + buffer->bsend_count = 0UL; + buffer->bsend_pagebits = 0; +#if 1 + fprintf(stderr, "constructing mutext at %p \n", &buffer->bsend_mutex); + OBJ_CONSTRUCT(&buffer->bsend_mutex, opal_mutex_t); + OBJ_CONSTRUCT(&buffer->bsend_condition, opal_condition_t); +#endif +} + +static void mca_pml_bsend_buffer_destruct(mca_pml_bsend_buffer_t *buffer); +static void mca_pml_bsend_buffer_destruct(mca_pml_bsend_buffer_t *buffer) +{ + fprintf(stderr, "inside buffer destruct %p\n", (void *)buffer); +#if 1 + fprintf(stderr, "destrucint mutext at %p \n", &buffer->bsend_mutex); + OBJ_DESTRUCT(&buffer->bsend_mutex); + OBJ_DESTRUCT(&buffer->bsend_condition); +#endif +} + +OBJ_CLASS_INSTANCE (mca_pml_bsend_buffer_t, opal_object_t, + mca_pml_bsend_buffer_construct, + mca_pml_bsend_buffer_destruct); + static size_t mca_pml_bsend_pagesz; /* mmap page size */ static int mca_pml_bsend_pagebits; /* number of bits in pagesz */ static opal_atomic_int32_t mca_pml_bsend_init = 0; @@ -59,8 +83,50 @@ static opal_atomic_int32_t mca_pml_bsend_init = 0; /* defined in pml_base_open.c */ extern char *ompi_pml_base_bsend_allocator_name; +static mca_pml_bsend_buffer_t *mca_pml_bsend_buffer=NULL; +static mca_allocator_base_component_t* mca_pml_bsend_allocator_component; + static int mca_pml_base_bsend_fini (void); +/* + * Routine to select which buffer to used based on section 3.6 of the MPI 5 standard + */ + +static mca_pml_bsend_buffer_t *mca_pml_bsend_buffer_get(ompi_communicator_t *comm) +{ + mca_pml_bsend_buffer_t *buffer = NULL; + + /* + * first see if a buffer has been attached to the communicator + */ + + buffer = ompi_comm_bsend_buffer_get(comm); + if (NULL != buffer) { + return buffer; + } + + /* + * maybe the instance (aka session) has a buffer associated with it. + */ + + if (MPI_SESSION_NULL != comm->instance) { + buffer = ompi_instance_bsend_buffer_get(comm->instance); + if (NULL != buffer) { + return buffer; + } + } + + /* + * okay see if the old MPI-1 style buffer is available + */ + + if (NULL != mca_pml_bsend_buffer) { + return mca_pml_bsend_buffer; + } + + return NULL; +} + /* * Routine to return pages to sub-allocator as needed */ @@ -68,17 +134,40 @@ static void* mca_pml_bsend_alloc_segment(void *ctx, size_t *size_inout) { void *addr; size_t size = *size_inout; - if(mca_pml_bsend_addr + size > mca_pml_bsend_base + mca_pml_bsend_size) { + mca_pml_bsend_buffer_t *buf_instance = (mca_pml_bsend_buffer_t *)ctx; + + if(buf_instance->bsend_addr + size > buf_instance->bsend_base + buf_instance->bsend_size) { return NULL; } /* allocate all that is left */ - size = mca_pml_bsend_size - (mca_pml_bsend_addr - mca_pml_bsend_base); - addr = mca_pml_bsend_addr; - mca_pml_bsend_addr += size; + size = buf_instance->bsend_size - (buf_instance->bsend_addr - buf_instance->bsend_base); + addr = buf_instance->bsend_addr; + buf_instance->bsend_addr += size; *size_inout = size; return addr; } +/* + * Routines to implement MPI_BUFFER_AUTOMATIC (MPI 4.0) + */ + +static void* mca_pml_bsend_alloc_seg_auto(void *ctx, size_t *size_inout) +{ + void *addr; + const uint64_t seg_size = (1024UL * 1024UL); + + addr = malloc(seg_size); + if (NULL != addr); + + *size_inout += seg_size; + return addr; +} + +static void mca_pml_bsend_dealloc_seg_auto(void *ctx, void *segment) +{ + free(segment); +} + /* * One time initialization at startup */ @@ -89,12 +178,8 @@ int mca_pml_base_bsend_init (void) if(OPAL_THREAD_ADD_FETCH32(&mca_pml_bsend_init, 1) > 1) return OMPI_SUCCESS; - /* initialize static objects */ - OBJ_CONSTRUCT(&mca_pml_bsend_mutex, opal_mutex_t); - OBJ_CONSTRUCT(&mca_pml_bsend_condition, opal_condition_t); - /* lookup name of the allocator to use for buffered sends */ - if(NULL == (mca_pml_bsend_allocator_component = mca_allocator_component_lookup(ompi_pml_base_bsend_allocator_name))) { + if(NULL == ( mca_pml_bsend_allocator_component = mca_allocator_component_lookup(ompi_pml_base_bsend_allocator_name))) { return OMPI_ERR_BUFFER; } @@ -120,38 +205,84 @@ static int mca_pml_base_bsend_fini (void) if(OPAL_THREAD_ADD_FETCH32(&mca_pml_bsend_init,-1) > 0) return OMPI_SUCCESS; - if(NULL != mca_pml_bsend_allocator) - mca_pml_bsend_allocator->alc_finalize(mca_pml_bsend_allocator); - mca_pml_bsend_allocator = NULL; + mca_pml_bsend_allocator_component = NULL; + mca_pml_bsend_buffer = NULL; + + mca_pml_bsend_pagebits = 0; - OBJ_DESTRUCT(&mca_pml_bsend_condition); - OBJ_DESTRUCT(&mca_pml_bsend_mutex); return OMPI_SUCCESS; } /* - * User-level call to attach buffer. + * User-level call to attach buffer */ -int mca_pml_base_bsend_attach(void* addr, size_t size) +int mca_pml_base_bsend_attach(ompi_bsend_buffer_type_t type, void *obj, void* addr, size_t size) { int align; + int ret = MPI_SUCCESS; + mca_pml_bsend_buffer_t *buf_instance; + ompi_communicator_t *comm; + ompi_instance_t *instance; - if(NULL == addr || size <= 0) { + if(NULL == addr || ((MPI_BUFFER_AUTOMATIC != addr) && size <= 0)) { return OMPI_ERR_BUFFER; } - /* check for buffer already attached */ - OPAL_THREAD_LOCK(&mca_pml_bsend_mutex); - if(NULL != mca_pml_bsend_allocator) { - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); - return OMPI_ERR_BUFFER; + /* + * check if object already has a buffer associated with it + */ + + switch (type) { + case BASE_BSEND_BUF: + if (NULL != mca_pml_bsend_buffer) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + case COMM_BSEND_BUF: + comm = (ompi_communicator_t *)obj; + if (NULL != ompi_comm_bsend_buffer_get(comm)) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + case SESSION_BSEND_BUF: + instance = (ompi_instance_t *)obj; + if (NULL != ompi_instance_bsend_buffer_get(instance)) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + default: + /* This should not happen */ + assert(0); + ret = OMPI_ERR_BAD_PARAM; + goto fn_exit; + break; } + + buf_instance = OBJ_NEW(mca_pml_bsend_buffer_t); + OBJ_CONSTRUCT(&buf_instance->bsend_mutex, opal_mutex_t); + OBJ_CONSTRUCT(&buf_instance->bsend_condition, opal_condition_t); + /* try to create an instance of the allocator - to determine thread safety level */ - mca_pml_bsend_allocator = mca_pml_bsend_allocator_component->allocator_init(ompi_mpi_thread_multiple, mca_pml_bsend_alloc_segment, NULL, NULL); - if(NULL == mca_pml_bsend_allocator) { - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + if (MPI_BUFFER_AUTOMATIC != addr) { + buf_instance->bsend_allocator = mca_pml_bsend_allocator_component->allocator_init(ompi_mpi_thread_multiple, + mca_pml_bsend_alloc_segment, + NULL, + buf_instance); + } else { + buf_instance->bsend_allocator = mca_pml_bsend_allocator_component->allocator_init(ompi_mpi_thread_multiple, + mca_pml_bsend_alloc_seg_auto, + mca_pml_bsend_dealloc_seg_auto, + buf_instance); + } + if(NULL == buf_instance->bsend_allocator) { return OMPI_ERR_BUFFER; } @@ -159,8 +290,8 @@ int mca_pml_base_bsend_attach(void* addr, size_t size) * Save away what the user handed in. This is done in case the * base and size are modified for alignment issues. */ - mca_pml_bsend_userbase = (unsigned char*)addr; - mca_pml_bsend_usersize = size; + buf_instance->bsend_userbase = (unsigned char*)addr; + buf_instance->bsend_usersize = size; /* * Align to pointer boundaries. The bsend overhead is large enough * to account for this. Compute any alignment that needs to be done. @@ -168,51 +299,258 @@ int mca_pml_base_bsend_attach(void* addr, size_t size) align = sizeof(void *) - ((size_t)addr & (sizeof(void *) - 1)); /* setup local variables */ - mca_pml_bsend_base = (unsigned char *)addr + align; - mca_pml_bsend_addr = (unsigned char *)addr + align; - mca_pml_bsend_size = size - align; - mca_pml_bsend_count = 0; - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); - return OMPI_SUCCESS; + buf_instance->bsend_base = (unsigned char *)addr + align; + buf_instance->bsend_addr = (unsigned char *)addr + align; + buf_instance->bsend_size = size - align; + buf_instance->bsend_count = 0; + buf_instance->bsend_pagebits = mca_pml_bsend_pagebits; + + switch (type) { + case BASE_BSEND_BUF: + mca_pml_bsend_buffer = buf_instance; + break; + + case COMM_BSEND_BUF: + ret = ompi_comm_bsend_buffer_set(comm, buf_instance); + if(OMPI_SUCCESS != ret) { + goto fn_exit; + } + OBJ_RETAIN(comm); + break; + + case SESSION_BSEND_BUF: + ret = ompi_instance_bsend_buffer_set(instance, buf_instance); + if(OMPI_SUCCESS != ret) { + goto fn_exit; + } + OBJ_RETAIN(instance); + break; + + default: + /* This should not happen */ + assert(0); + break; + } + +fn_exit: + return ret; } /* * User-level call to detach buffer */ -int mca_pml_base_bsend_detach(void* addr, size_t* size) +int mca_pml_base_bsend_detach(ompi_bsend_buffer_type_t type, void *obj, void* addr, size_t* size) { - OPAL_THREAD_LOCK(&mca_pml_bsend_mutex); - - /* is buffer attached */ - if(NULL == mca_pml_bsend_allocator) { - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); - return OMPI_ERR_BUFFER; + int ret = OMPI_SUCCESS; + mca_pml_bsend_buffer_t *buf_instance = NULL; + ompi_communicator_t *comm = NULL; + ompi_instance_t *instance = NULL; + + fprintf(stderr, "inside mca_pml_base_bsend_detach addr %p size %p\n", addr, size); + switch (type) { + case BASE_BSEND_BUF: + buf_instance = mca_pml_bsend_buffer; + if (NULL == buf_instance) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + case COMM_BSEND_BUF: + comm = (ompi_communicator_t *)obj; + buf_instance = ompi_comm_bsend_buffer_get(comm); + if (NULL == buf_instance) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + case SESSION_BSEND_BUF: + instance = (ompi_instance_t *)obj; + buf_instance = ompi_instance_bsend_buffer_get(instance); + if (NULL == buf_instance) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + default: + /* This should not happen */ + assert(0); + break; } - /* wait on any pending requests */ - while(mca_pml_bsend_count != 0) - opal_condition_wait(&mca_pml_bsend_condition, &mca_pml_bsend_mutex); + fprintf(stderr, "buf_instance->bsend_count %d\n", buf_instance->bsend_count); + while(buf_instance->bsend_count != 0) { + opal_condition_wait(&buf_instance->bsend_condition, &buf_instance->bsend_mutex); + opal_progress(); + } /* free resources associated with the allocator */ - mca_pml_bsend_allocator->alc_finalize(mca_pml_bsend_allocator); - mca_pml_bsend_allocator = NULL; + buf_instance->bsend_allocator->alc_finalize(buf_instance->bsend_allocator); + buf_instance->bsend_allocator = NULL; /* return current settings */ if(NULL != addr) - *((void**)addr) = mca_pml_bsend_userbase; + *((void**)addr) = buf_instance->bsend_userbase; if(NULL != size) - *size = mca_pml_bsend_usersize; - - /* reset local variables */ - mca_pml_bsend_userbase = NULL; - mca_pml_bsend_usersize = 0; - mca_pml_bsend_base = NULL; - mca_pml_bsend_addr = NULL; - mca_pml_bsend_size = 0; - mca_pml_bsend_count = 0; - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + *size = buf_instance->bsend_usersize; + + switch (type) { + case BASE_BSEND_BUF: + mca_pml_bsend_buffer= NULL; + break; + + case COMM_BSEND_BUF: + ret = ompi_comm_bsend_buffer_set(comm, NULL); + if (OMPI_SUCCESS != ret) { + goto fn_exit; + } + OBJ_RELEASE(comm); + break; + + case SESSION_BSEND_BUF: + ret = ompi_instance_bsend_buffer_set(instance, NULL); + if (OMPI_SUCCESS != ret) { + goto fn_exit; + } + OBJ_RELEASE(instance); + break; + + default: + /* This should not happen */ + assert(0); + break; + } + + fprintf(stderr, "about to destruct %p\n", (void *)buf_instance); + OBJ_DESTRUCT(buf_instance); + +fn_exit: + return ret; +} + +int mca_pml_base_bsend_flush(ompi_bsend_buffer_type_t type, void *obj) +{ + int ret = OMPI_SUCCESS; + mca_pml_bsend_buffer_t *buf_instance = NULL; + ompi_communicator_t *comm = NULL; + ompi_instance_t *instance = NULL; + + switch (type) { + case BASE_BSEND_BUF: + buf_instance = mca_pml_bsend_buffer; + if (NULL == buf_instance) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + case COMM_BSEND_BUF: + comm = (ompi_communicator_t *)obj; + buf_instance = ompi_comm_bsend_buffer_get(comm); + if (NULL == buf_instance) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + case SESSION_BSEND_BUF: + instance = (ompi_instance_t *)obj; + buf_instance = ompi_instance_bsend_buffer_get(instance); + if (NULL == buf_instance) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + default: + /* This should not happen */ + assert(0); + break; + } + + while(buf_instance->bsend_count != 0) { + opal_condition_wait(&buf_instance->bsend_condition, &buf_instance->bsend_mutex); + opal_progress(); + } + +fn_exit: + return ret; +} + +#if 0 +static int mca_pml_base_bsend_request_free(ompi_request_t ** request) +{ + (*request)->req_state = OMPI_REQUEST_INVALID; + OBJ_RELEASE(*request); + *request = MPI_REQUEST_NULL; return OMPI_SUCCESS; } +#endif + +/* + * TODO: right now treating this as blocking + */ +int mca_pml_base_bsend_iflush(ompi_bsend_buffer_type_t type, void *obj, ompi_request_t **req) +{ + int ret = OMPI_SUCCESS; + mca_pml_bsend_buffer_t *buf_instance = NULL; + ompi_communicator_t *comm = NULL; + ompi_instance_t *instance = NULL; + ompi_request_t *flush_request = NULL; + + switch (type) { + case BASE_BSEND_BUF: + buf_instance = mca_pml_bsend_buffer; + if (NULL == buf_instance) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + case COMM_BSEND_BUF: + comm = (ompi_communicator_t *)obj; + buf_instance = ompi_comm_bsend_buffer_get(comm); + if (NULL == buf_instance) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + case SESSION_BSEND_BUF: + instance = (ompi_instance_t *)obj; + buf_instance = ompi_instance_bsend_buffer_get(instance); + if (NULL == buf_instance) { + ret = OMPI_ERR_BUFFER; + goto fn_exit; + } + break; + + default: + /* This should not happen */ + assert(0); + break; + } + +#if 0 + /* Set up request */ + flush_request = OBJ_NEW(ompi_request_t); + OMPI_REQUEST_INIT(flush_request, 0); + flush_request->req_type = OMPI_REQUEST_BUF; + flush_request->req_free = mca_pml_base_bsend_request_free; + flush_request->req_status = (ompi_status_public_t){0}; +#endif + + while(buf_instance->bsend_count != 0) { + opal_condition_wait(&buf_instance->bsend_condition, &buf_instance->bsend_mutex); + opal_progress(); + } + +fn_exit: + *req = MPI_REQUEST_NULL; + return ret; +} /* @@ -222,6 +560,7 @@ int mca_pml_base_bsend_detach(void* addr, size_t* size) int mca_pml_base_bsend_request_start(ompi_request_t* request) { mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request; + mca_pml_bsend_buffer_t *buffer = NULL; struct iovec iov; unsigned int iov_count; size_t max_data; @@ -230,24 +569,24 @@ int mca_pml_base_bsend_request_start(ompi_request_t* request) if(sendreq->req_bytes_packed > 0) { /* has a buffer been provided */ - OPAL_THREAD_LOCK(&mca_pml_bsend_mutex); - if(NULL == mca_pml_bsend_addr) { + buffer = mca_pml_bsend_buffer_get(request->req_mpi_object.comm); + if (NULL == buffer) { sendreq->req_addr = NULL; - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); return OMPI_ERR_BUFFER; } /* allocate a buffer to hold packed message */ - sendreq->req_addr = mca_pml_bsend_allocator->alc_alloc( - mca_pml_bsend_allocator, sendreq->req_bytes_packed, 0); + OPAL_THREAD_LOCK(&buffer->bsend_mutex); + sendreq->req_addr = buffer->bsend_allocator->alc_alloc( + buffer->bsend_allocator, sendreq->req_bytes_packed, 0); if(NULL == sendreq->req_addr) { /* release resources when request is freed */ sendreq->req_base.req_pml_complete = true; - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + OPAL_THREAD_UNLOCK(&buffer->bsend_mutex); return OMPI_ERR_BUFFER; } - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + OPAL_THREAD_UNLOCK(&buffer->bsend_mutex); /* The convertor is already initialized in the beginning so we just have to * pack the data in the newly allocated buffer. @@ -267,9 +606,8 @@ int mca_pml_base_bsend_request_start(ompi_request_t* request) opal_convertor_prepare_for_send( &sendreq->req_base.req_convertor, &(ompi_mpi_packed.dt.super), max_data, sendreq->req_addr ); /* increment count of pending requests */ - mca_pml_bsend_count++; + buffer->bsend_count++; } - return OMPI_SUCCESS; } @@ -281,24 +619,26 @@ int mca_pml_base_bsend_request_start(ompi_request_t* request) int mca_pml_base_bsend_request_alloc(ompi_request_t* request) { mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request; + mca_pml_bsend_buffer_t *buffer = NULL; assert( sendreq->req_bytes_packed > 0 ); /* has a buffer been provided */ - OPAL_THREAD_LOCK(&mca_pml_bsend_mutex); - if(NULL == mca_pml_bsend_addr) { + buffer = mca_pml_bsend_buffer_get(request->req_mpi_object.comm); + if (NULL == buffer) { sendreq->req_addr = NULL; - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); return OMPI_ERR_BUFFER; } + OPAL_THREAD_LOCK(&buffer->bsend_mutex); + /* allocate a buffer to hold packed message */ - sendreq->req_addr = mca_pml_bsend_allocator->alc_alloc( - mca_pml_bsend_allocator, sendreq->req_bytes_packed, 0); + sendreq->req_addr = buffer->bsend_allocator->alc_alloc( + buffer->bsend_allocator, sendreq->req_bytes_packed, 0); if(NULL == sendreq->req_addr) { /* release resources when request is freed */ sendreq->req_base.req_pml_complete = true; - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + OPAL_THREAD_UNLOCK(&buffer->bsend_mutex); /* progress communications, with the hope that more resources * will be freed */ opal_progress(); @@ -306,8 +646,8 @@ int mca_pml_base_bsend_request_alloc(ompi_request_t* request) } /* increment count of pending requests */ - mca_pml_bsend_count++; - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + buffer->bsend_count++; + OPAL_THREAD_UNLOCK(&buffer->bsend_mutex); return OMPI_SUCCESS; } @@ -316,22 +656,24 @@ int mca_pml_base_bsend_request_alloc(ompi_request_t* request) * allocate buffer */ -void* mca_pml_base_bsend_request_alloc_buf( size_t length ) +void* mca_pml_base_bsend_request_alloc_buf(ompi_communicator_t *comm, size_t length ) { void* buf = NULL; + mca_pml_bsend_buffer_t *ompi_buffer = NULL; + /* has a buffer been provided */ - OPAL_THREAD_LOCK(&mca_pml_bsend_mutex); - if(NULL == mca_pml_bsend_addr) { - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + ompi_buffer = mca_pml_bsend_buffer_get(comm); + if (NULL == ompi_buffer) { return NULL; } + OPAL_THREAD_LOCK(&ompi_buffer->bsend_mutex); /* allocate a buffer to hold packed message */ - buf = mca_pml_bsend_allocator->alc_alloc( - mca_pml_bsend_allocator, length, 0); + buf = ompi_buffer->bsend_allocator->alc_alloc( + ompi_buffer->bsend_allocator, length, 0); if(NULL == buf) { /* release resources when request is freed */ - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + OPAL_THREAD_UNLOCK(&ompi_buffer->bsend_mutex); /* progress communications, with the hope that more resources * will be freed */ opal_progress(); @@ -339,8 +681,8 @@ void* mca_pml_base_bsend_request_alloc_buf( size_t length ) } /* increment count of pending requests */ - mca_pml_bsend_count++; - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + ompi_buffer->bsend_count++; + OPAL_THREAD_UNLOCK(&ompi_buffer->bsend_mutex); return buf; } @@ -349,19 +691,26 @@ void* mca_pml_base_bsend_request_alloc_buf( size_t length ) /* * Request completed - free buffer and decrement pending count */ -int mca_pml_base_bsend_request_free(void* addr) +int mca_pml_base_bsend_request_free(ompi_communicator_t *comm, void* addr) { - /* remove from list of pending requests */ - OPAL_THREAD_LOCK(&mca_pml_bsend_mutex); + mca_pml_bsend_buffer_t *buffer = NULL; + + buffer = mca_pml_bsend_buffer_get(comm); + assert(NULL != buffer); + if (NULL == buffer) { + return OMPI_ERR_BUFFER; + } + + OPAL_THREAD_LOCK(&buffer->bsend_mutex); /* free buffer */ - mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, addr); + buffer->bsend_allocator->alc_free(buffer->bsend_allocator, addr); - /* decrement count of buffered requests */ - if(--mca_pml_bsend_count == 0) - opal_condition_signal(&mca_pml_bsend_condition); + /* decrement count of buffered requests for this buffer pool */ + if(--buffer->bsend_count == 0) + opal_condition_signal(&buffer->bsend_condition); - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + OPAL_THREAD_UNLOCK(&buffer->bsend_mutex); return OMPI_SUCCESS; } @@ -372,6 +721,8 @@ int mca_pml_base_bsend_request_free(void* addr) */ int mca_pml_base_bsend_request_fini(ompi_request_t* request) { + mca_pml_bsend_buffer_t *buffer = mca_pml_bsend_buffer_get(request->req_mpi_object.comm); + mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request; if(sendreq->req_bytes_packed == 0 || sendreq->req_addr == NULL || @@ -379,17 +730,17 @@ int mca_pml_base_bsend_request_fini(ompi_request_t* request) return OMPI_SUCCESS; /* remove from list of pending requests */ - OPAL_THREAD_LOCK(&mca_pml_bsend_mutex); + OPAL_THREAD_LOCK(&buffer->bsend_mutex); /* free buffer */ - mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, (void *)sendreq->req_addr); + buffer->bsend_allocator->alc_free(buffer->bsend_allocator, (void *)sendreq->req_addr); sendreq->req_addr = sendreq->req_base.req_addr; /* decrement count of buffered requests */ - if(--mca_pml_bsend_count == 0) - opal_condition_signal(&mca_pml_bsend_condition); + if(--buffer->bsend_count == 0) + opal_condition_signal(&buffer->bsend_condition); - OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex); + OPAL_THREAD_UNLOCK(&buffer->bsend_mutex); return OMPI_SUCCESS; } diff --git a/ompi/mca/pml/base/pml_base_bsend.h b/ompi/mca/pml/base/pml_base_bsend.h index 0d3c47a7ca5..b4eeb9541a0 100644 --- a/ompi/mca/pml/base/pml_base_bsend.h +++ b/ompi/mca/pml/base/pml_base_bsend.h @@ -25,23 +25,48 @@ #include "ompi_config.h" #include "ompi/request/request.h" +#include "ompi/communicator/communicator.h" +#include "opal/mca/allocator/allocator.h" BEGIN_C_DECLS +enum ompi_bsend_buffer_type_t { + BASE_BSEND_BUF = 0, + COMM_BSEND_BUF, + SESSION_BSEND_BUF, +}; +typedef enum ompi_bsend_buffer_type_t ompi_bsend_buffer_type_t; + +struct mca_pml_bsend_buffer_t { + opal_object_t *super; + mca_allocator_base_component_t* allocator_component; + mca_allocator_base_module_t* bsend_allocator; /* sub-allocator to manage users buffer */ + opal_mutex_t bsend_mutex; + opal_condition_t bsend_condition; + size_t bsend_usersize; /* user provided buffer size */ + unsigned char *bsend_userbase; /* user provided buffer base */ + unsigned char *bsend_base; /* adjusted base of user buffer */ + unsigned char *bsend_addr; /* current offset into user buffer */ + size_t bsend_size; /* adjusted size of user buffer */ + size_t bsend_count; /* number of outstanding requests */ + size_t bsend_pagesz; /* mmap page size */ + int bsend_pagebits; /* number of bits in pagesz */ +}; +typedef struct mca_pml_bsend_buffer_t mca_pml_bsend_buffer_t; + OMPI_DECLSPEC int mca_pml_base_bsend_init (void); -int mca_pml_base_bsend_attach(void* addr, size_t size); -int mca_pml_base_bsend_detach(void* addr, size_t* size); +OMPI_DECLSPEC int mca_pml_base_bsend_attach(ompi_bsend_buffer_type_t type, void *obj, void* addr, size_t size); +OMPI_DECLSPEC int mca_pml_base_bsend_detach(ompi_bsend_buffer_type_t type, void *obj, void* addr, size_t* size); OMPI_DECLSPEC int mca_pml_base_bsend_request_alloc(ompi_request_t*); OMPI_DECLSPEC int mca_pml_base_bsend_request_start(ompi_request_t*); OMPI_DECLSPEC int mca_pml_base_bsend_request_fini(ompi_request_t*); -OMPI_DECLSPEC void* mca_pml_base_bsend_request_alloc_buf( size_t length ); -OMPI_DECLSPEC int mca_pml_base_bsend_request_free(void* addr); +OMPI_DECLSPEC void* mca_pml_base_bsend_request_alloc_buf(ompi_communicator_t *comm, size_t length ); +OMPI_DECLSPEC int mca_pml_base_bsend_request_free(ompi_communicator_t *comm, void* addr); -extern unsigned char *mca_pml_bsend_userbase; /* user provided buffer base */ -extern unsigned char *mca_pml_bsend_base; /* adjusted base of user buffer */ -extern unsigned char *mca_pml_bsend_addr; /* current offset into user buffer */ +OMPI_DECLSPEC int mca_pml_base_bsend_flush(ompi_bsend_buffer_type_t type, void *obj); +OMPI_DECLSPEC int mca_pml_base_bsend_iflush(ompi_bsend_buffer_type_t type, void *obj, ompi_request_t **req); END_C_DECLS diff --git a/ompi/mca/pml/cm/pml_cm_sendreq.h b/ompi/mca/pml/cm/pml_cm_sendreq.h index 4872943c05a..a3a301b4ea3 100644 --- a/ompi/mca/pml/cm/pml_cm_sendreq.h +++ b/ompi/mca/pml/cm/pml_cm_sendreq.h @@ -368,7 +368,8 @@ do { \ \ if(sendreq->req_count > 0) { \ sendreq->req_buff = \ - mca_pml_base_bsend_request_alloc_buf(sendreq->req_count); \ + mca_pml_base_bsend_request_alloc_buf(sendreq->req_send.req_base.req_comm, \ + sendreq->req_count); \ if (NULL == sendreq->req_buff) { \ ret = MPI_ERR_BUFFER; \ } else { \ @@ -431,7 +432,9 @@ do { \ if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ sendreq->req_count > 0 ) { \ - mca_pml_base_bsend_request_free(sendreq->req_buff); \ + ompi_communicator_t *comm; \ + comm = (ompi_communicator_t *)sendreq->req_send.req_base.req_comm; \ + mca_pml_base_bsend_request_free(comm, sendreq->req_buff); \ } \ \ if( !REQUEST_COMPLETE(&sendreq->req_send.req_base.req_ompi)) { \ diff --git a/ompi/mca/pml/ucx/pml_ucx.c b/ompi/mca/pml/ucx/pml_ucx.c index c748b02e12f..3bb61ad0490 100644 --- a/ompi/mca/pml/ucx/pml_ucx.c +++ b/ompi/mca/pml/ucx/pml_ucx.c @@ -785,7 +785,7 @@ int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datat static ucs_status_ptr_t mca_pml_ucx_bsend(ucp_ep_h ep, const void *buf, size_t count, - ompi_datatype_t *datatype, uint64_t pml_tag) + ompi_datatype_t *datatype, uint64_t pml_tag, ompi_communicator_t *comm) { ompi_request_t *req; void *packed_data; @@ -801,7 +801,7 @@ mca_pml_ucx_bsend(ucp_ep_h ep, const void *buf, size_t count, &opal_conv); opal_convertor_get_packed_size(&opal_conv, &packed_length); - packed_data = mca_pml_base_bsend_request_alloc_buf(packed_length); + packed_data = mca_pml_base_bsend_request_alloc_buf(comm, packed_length); if (OPAL_UNLIKELY(NULL == packed_data)) { OBJ_DESTRUCT(&opal_conv); PML_UCX_ERROR("bsend: failed to allocate buffer"); @@ -816,7 +816,7 @@ mca_pml_ucx_bsend(ucp_ep_h ep, const void *buf, size_t count, offset = 0; opal_convertor_set_position(&opal_conv, &offset); if (0 > opal_convertor_pack(&opal_conv, &iov, &iov_count, &packed_length)) { - mca_pml_base_bsend_request_free(packed_data); + mca_pml_base_bsend_request_free(comm, packed_data); OBJ_DESTRUCT(&opal_conv); PML_UCX_ERROR("bsend: failed to pack user datatype"); return UCS_STATUS_PTR(OMPI_ERROR); @@ -829,17 +829,18 @@ mca_pml_ucx_bsend(ucp_ep_h ep, const void *buf, size_t count, mca_pml_ucx_bsend_completion); if (NULL == req) { /* request was completed in place */ - mca_pml_base_bsend_request_free(packed_data); + mca_pml_base_bsend_request_free(comm, packed_data); return NULL; } if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(req))) { - mca_pml_base_bsend_request_free(packed_data); + mca_pml_base_bsend_request_free(comm, packed_data); PML_UCX_ERROR("ucx bsend failed: %s", ucs_status_string(UCS_PTR_STATUS(req))); return UCS_STATUS_PTR(OMPI_ERROR); } req->req_complete_cb_data = packed_data; + req->req_mpi_object.comm = comm; return NULL; } @@ -850,10 +851,11 @@ static inline ucs_status_ptr_t mca_pml_ucx_common_send(ucp_ep_h ep, const void * ucp_datatype_t ucx_datatype, ucp_tag_t tag, mca_pml_base_send_mode_t mode, + ompi_communicator_t *comm, ucp_send_callback_t cb) { if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_BUFFERED == mode)) { - return mca_pml_ucx_bsend(ep, buf, count, datatype, tag); + return mca_pml_ucx_bsend(ep, buf, count, datatype, tag, comm); } else if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) { return ucp_tag_send_sync_nb(ep, buf, count, ucx_datatype, tag, cb); } else { @@ -869,12 +871,13 @@ mca_pml_ucx_common_send_nbx(ucp_ep_h ep, const void *buf, ompi_datatype_t *datatype, ucp_tag_t tag, mca_pml_base_send_mode_t mode, + ompi_communicator_t *comm, ucp_request_param_t *param) { pml_ucx_datatype_t *op_data = mca_pml_ucx_get_op_data(datatype); if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_BUFFERED == mode)) { - return mca_pml_ucx_bsend(ep, buf, count, datatype, tag); + return mca_pml_ucx_bsend(ep, buf, count, datatype, tag, comm); } else if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) { return ucp_tag_send_sync_nb(ep, buf, count, mca_pml_ucx_get_datatype(datatype), tag, @@ -913,11 +916,11 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, #if HAVE_DECL_UCP_TAG_SEND_NBX req = (ompi_request_t*)mca_pml_ucx_common_send_nbx(ep, buf, count, datatype, PML_UCX_MAKE_SEND_TAG(tag, comm, cid), mode, - &mca_pml_ucx_get_op_data(datatype)->op_param.isend); + comm, &mca_pml_ucx_get_op_data(datatype)->op_param.isend); #else req = (ompi_request_t*)mca_pml_ucx_common_send(ep, buf, count, datatype, mca_pml_ucx_get_datatype(datatype), - PML_UCX_MAKE_SEND_TAG(tag, comm, cid), mode, + PML_UCX_MAKE_SEND_TAG(tag, comm, cid), mode, comm, mca_pml_ucx_send_completion); #endif @@ -951,13 +954,13 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, static inline __opal_attribute_always_inline__ int mca_pml_ucx_send_nb(ucp_ep_h ep, const void *buf, size_t count, ompi_datatype_t *datatype, ucp_datatype_t ucx_datatype, - ucp_tag_t tag, mca_pml_base_send_mode_t mode) + ucp_tag_t tag, mca_pml_base_send_mode_t mode, ompi_communicator_t *comm) { ompi_request_t *req; req = (ompi_request_t*)mca_pml_ucx_common_send(ep, buf, count, datatype, mca_pml_ucx_get_datatype(datatype), - tag, mode, + tag, mode, comm, mca_pml_ucx_send_completion_empty); if (OPAL_LIKELY(req == NULL)) { return OMPI_SUCCESS; @@ -1048,7 +1051,7 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i return mca_pml_ucx_send_nb(ep, buf, count, datatype, mca_pml_ucx_get_datatype(datatype), - PML_UCX_MAKE_SEND_TAG(tag, comm, cid), mode); + PML_UCX_MAKE_SEND_TAG(tag, comm, cid), mode, comm); } int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm, @@ -1219,6 +1222,7 @@ int mca_pml_ucx_start(size_t count, ompi_request_t** requests) preq->datatype, preq->tag, preq->send.mode, + preq->ompi.req_mpi_object.comm, mca_pml_ucx_psend_completion); } else { PML_UCX_VERBOSE(8, "start recv request %p", (void*)preq); diff --git a/ompi/mca/pml/ucx/pml_ucx_request.c b/ompi/mca/pml/ucx/pml_ucx_request.c index 1a8d0dbc043..88c63c80ebe 100644 --- a/ompi/mca/pml/ucx/pml_ucx_request.c +++ b/ompi/mca/pml/ucx/pml_ucx_request.c @@ -62,10 +62,12 @@ __opal_attribute_always_inline__ static inline void mca_pml_ucx_bsend_completion_internal(void *request, ucs_status_t status) { ompi_request_t *req = request; + ompi_communicator_t *comm; PML_UCX_VERBOSE(8, "bsend request %p buffer %p completed with status %s", (void*)req, req->req_complete_cb_data, ucs_status_string(status)); - mca_pml_base_bsend_request_free(req->req_complete_cb_data); + comm = req->req_mpi_object.comm; + mca_pml_base_bsend_request_free(comm, req->req_complete_cb_data); req->req_complete_cb_data = NULL; mca_pml_ucx_set_send_status(&req->req_status, status); PML_UCX_ASSERT( !(REQUEST_COMPLETE(req))); diff --git a/ompi/mpi/c/Makefile.am b/ompi/mpi/c/Makefile.am index c8fad462772..0f313836368 100644 --- a/ompi/mpi/c/Makefile.am +++ b/ompi/mpi/c/Makefile.am @@ -78,6 +78,8 @@ prototype_sources = \ bsend_init.c.in \ buffer_attach.c.in \ buffer_detach.c.in \ + buffer_flush.c.in \ + buffer_iflush.c.in \ cancel.c.in \ cart_coords.c.in \ cart_create.c.in \ @@ -89,6 +91,7 @@ prototype_sources = \ cart_sub.c.in \ close_port.c.in \ comm_accept.c.in \ + comm_attach_buffer.c.in \ comm_c2f.c.in \ comm_call_errhandler.c.in \ comm_compare.c.in \ @@ -99,10 +102,12 @@ prototype_sources = \ comm_create_from_group.c.in \ comm_create_keyval.c.in \ comm_delete_attr.c.in \ + comm_detach_buffer.c.in \ comm_disconnect.c.in \ comm_dup.c.in \ comm_dup_with_info.c.in \ comm_f2c.c.in \ + comm_flush_buffer.c.in \ comm_free.c.in \ comm_free_keyval.c.in \ comm_get_attr.c.in \ @@ -113,6 +118,7 @@ prototype_sources = \ comm_group.c.in \ comm_idup.c.in \ comm_idup_with_info.c.in \ + comm_iflush_buffer.c.in \ comm_join.c.in \ comm_rank.c.in \ comm_remote_group.c.in \ @@ -360,16 +366,20 @@ prototype_sources = \ send_init.c.in \ sendrecv.c.in \ sendrecv_replace.c.in \ + session_attach_buffer.c.in \ session_c2f.c.in \ session_call_errhandler.c.in \ session_create_errhandler.c.in \ + session_detach_buffer.c.in \ session_f2c.c.in \ session_finalize.c.in \ + session_flush_buffer.c.in \ session_get_errhandler.c.in \ session_get_info.c.in \ session_get_nth_pset.c.in \ session_get_num_psets.c.in \ session_get_pset_info.c.in \ + session_iflush_buffer.c.in \ session_init.c.in \ session_set_errhandler.c.in \ session_set_info.c.in \ diff --git a/ompi/mpi/c/buffer_attach.c.in b/ompi/mpi/c/buffer_attach.c.in index 53d5421db8e..76de0476af9 100644 --- a/ompi/mpi/c/buffer_attach.c.in +++ b/ompi/mpi/c/buffer_attach.c.in @@ -40,7 +40,7 @@ PROTOTYPE ERROR_CLASS buffer_attach(BUFFER_OUT buffer, COUNT size) } } - ret = mca_pml_base_bsend_attach(buffer, size); + ret = mca_pml_base_bsend_attach(BASE_BSEND_BUF, NULL, buffer, size); return ret; } diff --git a/ompi/mpi/c/buffer_detach.c.in b/ompi/mpi/c/buffer_detach.c.in index 6e15b64e3ef..13074bd7a14 100644 --- a/ompi/mpi/c/buffer_detach.c.in +++ b/ompi/mpi/c/buffer_detach.c.in @@ -41,9 +41,13 @@ PROTOTYPE ERROR_CLASS buffer_detach(BUFFER_OUT buffer, COUNT_OUT size) } } - ret = mca_pml_base_bsend_detach(buffer, &size_arg); + ret = mca_pml_base_bsend_detach(BASE_BSEND_BUF, NULL, buffer, &size_arg); if (MPI_SUCCESS == ret) { +#if OMPI_BIGCOUNT_SRC + *size = size_arg; +#else *size = (int)size_arg; +#endif } return ret; diff --git a/ompi/mpi/c/buffer_flush.c.in b/ompi/mpi/c/buffer_flush.c.in new file mode 100644 index 00000000000..2888ff77393 --- /dev/null +++ b/ompi/mpi/c/buffer_flush.c.in @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS buffer_flush() +{ + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + } + + ret = mca_pml_base_bsend_flush(BASE_BSEND_BUF, NULL); + + return ret; +} + diff --git a/ompi/mpi/c/buffer_iflush.c.in b/ompi/mpi/c/buffer_iflush.c.in new file mode 100644 index 00000000000..132f67394db --- /dev/null +++ b/ompi/mpi/c/buffer_iflush.c.in @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024-2025 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS buffer_iflush(REQUEST_INOUT request) +{ + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (request == NULL) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_REQUEST, + "MPI_Buffer_iflush"); + } + } + + + ret = mca_pml_base_bsend_iflush(BASE_BSEND_BUF, NULL, request); + + return ret; +} + diff --git a/ompi/mpi/c/comm_attach_buffer.c.in b/ompi/mpi/c/comm_attach_buffer.c.in new file mode 100644 index 00000000000..f633e7ab851 --- /dev/null +++ b/ompi/mpi/c/comm_attach_buffer.c.in @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024-2025 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS comm_attach_buffer(COMM comm, BUFFER_OUT buffer, COUNT size) +{ + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == buffer || size < 0) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_ARG, FUNC_NAME); + } + if (ompi_comm_invalid(comm)) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM, FUNC_NAME); + } + } + + ret = mca_pml_base_bsend_attach(COMM_BSEND_BUF, comm, buffer, size); + + return ret; +} diff --git a/ompi/mpi/c/comm_buffer_attach.c.in b/ompi/mpi/c/comm_buffer_attach.c.in new file mode 100644 index 00000000000..8289cdb98cf --- /dev/null +++ b/ompi/mpi/c/comm_buffer_attach.c.in @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS comm_attach_buffer(COMM comm, BUFFER buffer, COUNT size) +{ + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + + if ( ompi_comm_invalid (comm)) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE( + MPI_ERR_COMM, FUNC_NAME); + } + + if (NULL == buffer || size < 0) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_ARG, FUNC_NAME); + } + } + + ret = mca_pml_base_bsend_attach(buffer, size); + + return ret; +} + diff --git a/ompi/mpi/c/comm_detach_buffer.c.in b/ompi/mpi/c/comm_detach_buffer.c.in new file mode 100644 index 00000000000..5d1cac57f4a --- /dev/null +++ b/ompi/mpi/c/comm_detach_buffer.c.in @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024-2025 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS comm_detach_buffer(COMM comm, BUFFER_OUT buffer, COUNT_OUT size) +{ + size_t size_arg; + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == buffer || NULL == size) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_ARG, FUNC_NAME); + } + if (ompi_comm_invalid(comm)) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM, FUNC_NAME); + } + } + + ret = mca_pml_base_bsend_detach(COMM_BSEND_BUF, comm, buffer, &size_arg); + if (MPI_SUCCESS == ret) { +#if OMPI_BIGCOUNT_SRC + *size = size_arg; +#else + *size = (int)size_arg; +#endif + } + + return ret; +} diff --git a/ompi/mpi/c/comm_flush_buffer.c.in b/ompi/mpi/c/comm_flush_buffer.c.in new file mode 100644 index 00000000000..de39c99716b --- /dev/null +++ b/ompi/mpi/c/comm_flush_buffer.c.in @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024-2025 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS comm_flush_buffer(COMM comm) +{ + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (ompi_comm_invalid(comm)) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM, FUNC_NAME); + } + } + + ret = mca_pml_base_bsend_flush(COMM_BSEND_BUF, comm); + + return ret; +} diff --git a/ompi/mpi/c/comm_iflush_buffer.c.in b/ompi/mpi/c/comm_iflush_buffer.c.in new file mode 100644 index 00000000000..46efea17e57 --- /dev/null +++ b/ompi/mpi/c/comm_iflush_buffer.c.in @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024-2025 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS comm_iflush_buffer(COMM comm, REQUEST_INOUT request) +{ + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (ompi_comm_invalid(comm)) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_COMM, FUNC_NAME); + } + if (request == NULL) { + ret = MPI_ERR_REQUEST; + } + OMPI_ERRHANDLER_CHECK(ret, comm, ret, FUNC_NAME); + } + + ret = mca_pml_base_bsend_iflush(COMM_BSEND_BUF, NULL, request); + + return ret; +} diff --git a/ompi/mpi/c/session_attach_buffer.c.in b/ompi/mpi/c/session_attach_buffer.c.in new file mode 100644 index 00000000000..c6d3d339c4c --- /dev/null +++ b/ompi/mpi/c/session_attach_buffer.c.in @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024-2025 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS session_attach_buffer(SESSION session, BUFFER_OUT buffer, COUNT size) +{ + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == buffer || size < 0) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_ARG, FUNC_NAME); + } + if (ompi_instance_invalid(session)) { + if (NULL != session) { + return OMPI_ERRHANDLER_INVOKE(session, MPI_ERR_SESSION, FUNC_NAME); + } else { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_SESSION, FUNC_NAME); + } + } + } + + ret = mca_pml_base_bsend_attach(SESSION_BSEND_BUF, session, buffer, size); + + return ret; +} diff --git a/ompi/mpi/c/session_detach_buffer.c.in b/ompi/mpi/c/session_detach_buffer.c.in new file mode 100644 index 00000000000..d9365439b2c --- /dev/null +++ b/ompi/mpi/c/session_detach_buffer.c.in @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024-2025 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS session_detach_buffer(SESSION session, BUFFER_OUT buffer, COUNT_OUT size) +{ + size_t size_arg; + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == buffer || NULL == size) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_ARG, FUNC_NAME); + } + if (ompi_instance_invalid(session)) { + if (NULL != session) { + return OMPI_ERRHANDLER_INVOKE(session, MPI_ERR_SESSION, FUNC_NAME); + } else { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_SESSION, FUNC_NAME); + } + } + } + + ret = mca_pml_base_bsend_detach(SESSION_BSEND_BUF, session, buffer, &size_arg); + if (MPI_SUCCESS == ret) { +#if OMPI_BIGCOUNT_SRC + *size = size_arg; +#else + *size = (int)size_arg; +#endif + } + + return ret; +} diff --git a/ompi/mpi/c/session_flush_buffer.c.in b/ompi/mpi/c/session_flush_buffer.c.in new file mode 100644 index 00000000000..1693e56dbac --- /dev/null +++ b/ompi/mpi/c/session_flush_buffer.c.in @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024-2025 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS session_flush_buffer(SESSION session) +{ + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (ompi_instance_invalid(session)) { + if (NULL != session) { + return OMPI_ERRHANDLER_INVOKE(session, MPI_ERR_SESSION, FUNC_NAME); + } else { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_SESSION, FUNC_NAME); + } + } + } + + ret = mca_pml_base_bsend_flush(SESSION_BSEND_BUF, session); + + return ret; +} diff --git a/ompi/mpi/c/session_iflush_buffer.c.in b/ompi/mpi/c/session_iflush_buffer.c.in new file mode 100644 index 00000000000..00016cf1927 --- /dev/null +++ b/ompi/mpi/c/session_iflush_buffer.c.in @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024-2025 Triad National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" + +PROTOTYPE ERROR_CLASS session_iflush_buffer(SESSION session, REQUEST_INOUT request) +{ + int ret = OMPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (ompi_instance_invalid(session)) { + if (NULL != session) { + return OMPI_ERRHANDLER_INVOKE(session, MPI_ERR_SESSION, FUNC_NAME); + } else { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_SESSION, FUNC_NAME); + } + } + if (request == NULL) { + ret = MPI_ERR_REQUEST; + } + OMPI_ERRHANDLER_CHECK(ret, session, ret, FUNC_NAME); + } + + ret = mca_pml_base_bsend_iflush(SESSION_BSEND_BUF, session, request); + + return ret; +} From 4db467f3dfb44e2715645d7ac49362f7a90cc20b Mon Sep 17 00:00:00 2001 From: Howard Pritchard Date: Tue, 22 Jul 2025 10:33:56 -0600 Subject: [PATCH 2/4] turn off some debug statements that causes mpi4py to be unhappy Signed-off-by: Howard Pritchard --- ompi/mca/pml/base/pml_base_bsend.c | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/ompi/mca/pml/base/pml_base_bsend.c b/ompi/mca/pml/base/pml_base_bsend.c index e1361cf5360..9e81a99e558 100644 --- a/ompi/mca/pml/base/pml_base_bsend.c +++ b/ompi/mca/pml/base/pml_base_bsend.c @@ -46,7 +46,6 @@ static void mca_pml_bsend_buffer_construct(mca_pml_bsend_buffer_t *buffer); static void mca_pml_bsend_buffer_construct(mca_pml_bsend_buffer_t *buffer) { - fprintf(stderr, "inside buffer construct %p\n", (void *)buffer); buffer->bsend_allocator = NULL; buffer->bsend_userbase = NULL; buffer->bsend_base = NULL; @@ -54,22 +53,15 @@ static void mca_pml_bsend_buffer_construct(mca_pml_bsend_buffer_t *buffer) buffer->bsend_size = 0UL; buffer->bsend_count = 0UL; buffer->bsend_pagebits = 0; -#if 1 - fprintf(stderr, "constructing mutext at %p \n", &buffer->bsend_mutex); OBJ_CONSTRUCT(&buffer->bsend_mutex, opal_mutex_t); OBJ_CONSTRUCT(&buffer->bsend_condition, opal_condition_t); -#endif } static void mca_pml_bsend_buffer_destruct(mca_pml_bsend_buffer_t *buffer); static void mca_pml_bsend_buffer_destruct(mca_pml_bsend_buffer_t *buffer) { - fprintf(stderr, "inside buffer destruct %p\n", (void *)buffer); -#if 1 - fprintf(stderr, "destrucint mutext at %p \n", &buffer->bsend_mutex); OBJ_DESTRUCT(&buffer->bsend_mutex); OBJ_DESTRUCT(&buffer->bsend_condition); -#endif } OBJ_CLASS_INSTANCE (mca_pml_bsend_buffer_t, opal_object_t, @@ -346,7 +338,6 @@ int mca_pml_base_bsend_detach(ompi_bsend_buffer_type_t type, void *obj, void* ad ompi_communicator_t *comm = NULL; ompi_instance_t *instance = NULL; - fprintf(stderr, "inside mca_pml_base_bsend_detach addr %p size %p\n", addr, size); switch (type) { case BASE_BSEND_BUF: buf_instance = mca_pml_bsend_buffer; @@ -380,7 +371,6 @@ int mca_pml_base_bsend_detach(ompi_bsend_buffer_type_t type, void *obj, void* ad break; } - fprintf(stderr, "buf_instance->bsend_count %d\n", buf_instance->bsend_count); while(buf_instance->bsend_count != 0) { opal_condition_wait(&buf_instance->bsend_condition, &buf_instance->bsend_mutex); opal_progress(); @@ -423,7 +413,6 @@ int mca_pml_base_bsend_detach(ompi_bsend_buffer_type_t type, void *obj, void* ad break; } - fprintf(stderr, "about to destruct %p\n", (void *)buf_instance); OBJ_DESTRUCT(buf_instance); fn_exit: From b090eb333f44135a65023f10daf7968f096c63c0 Mon Sep 17 00:00:00 2001 From: Howard Pritchard Date: Wed, 23 Jul 2025 08:59:09 -0600 Subject: [PATCH 3/4] add man pages Signed-off-by: Howard Pritchard --- docs/Makefile.am | 10 +++ docs/man-openmpi/man3/MPI_Buffer_attach.3.rst | 14 ++- docs/man-openmpi/man3/MPI_Buffer_detach.3.rst | 4 +- docs/man-openmpi/man3/MPI_Buffer_flush.3.rst | 42 +++++++++ docs/man-openmpi/man3/MPI_Buffer_iflush.3.rst | 8 ++ .../man3/MPI_Comm_attach_buffer.3.rst | 67 ++++++++++++++ .../man3/MPI_Comm_detach_buffer.3.rst | 88 +++++++++++++++++++ .../man3/MPI_Comm_flush_buffer.3.rst | 44 ++++++++++ .../man3/MPI_Comm_iflush_buffer.3.rst | 8 ++ .../man3/MPI_Session_attach_buffer.3.rst | 72 +++++++++++++++ .../man3/MPI_Session_detach_buffer.3.rst | 88 +++++++++++++++++++ .../man3/MPI_Session_flush_buffer.3.rst | 44 ++++++++++ .../man3/MPI_Session_iflush_buffer.3.rst | 8 ++ docs/man-openmpi/man3/index.rst | 10 +++ 14 files changed, 501 insertions(+), 6 deletions(-) create mode 100644 docs/man-openmpi/man3/MPI_Buffer_flush.3.rst create mode 100644 docs/man-openmpi/man3/MPI_Buffer_iflush.3.rst create mode 100644 docs/man-openmpi/man3/MPI_Comm_attach_buffer.3.rst create mode 100644 docs/man-openmpi/man3/MPI_Comm_detach_buffer.3.rst create mode 100644 docs/man-openmpi/man3/MPI_Comm_flush_buffer.3.rst create mode 100644 docs/man-openmpi/man3/MPI_Comm_iflush_buffer.3.rst create mode 100644 docs/man-openmpi/man3/MPI_Session_attach_buffer.3.rst create mode 100644 docs/man-openmpi/man3/MPI_Session_detach_buffer.3.rst create mode 100644 docs/man-openmpi/man3/MPI_Session_flush_buffer.3.rst create mode 100644 docs/man-openmpi/man3/MPI_Session_iflush_buffer.3.rst diff --git a/docs/Makefile.am b/docs/Makefile.am index f6993850939..4b267e608c1 100644 --- a/docs/Makefile.am +++ b/docs/Makefile.am @@ -118,6 +118,8 @@ OMPI_MAN3 = \ MPI_Bsend_init.3 \ MPI_Buffer_attach.3 \ MPI_Buffer_detach.3 \ + MPI_Buffer_flush.3 \ + MPI_Buffer_iflush.3 \ MPI_Cancel.3 \ MPI_Cart_coords.3 \ MPI_Cart_create.3 \ @@ -129,6 +131,7 @@ OMPI_MAN3 = \ MPI_Cart_sub.3 \ MPI_Close_port.3 \ MPI_Comm_accept.3 \ + MPI_Comm_attach_buffer.3 \ MPI_Comm_c2f.3 \ MPI_Comm_call_errhandler.3 \ MPI_Comm_compare.3 \ @@ -139,10 +142,12 @@ OMPI_MAN3 = \ MPI_Comm_create_group.3 \ MPI_Comm_create_keyval.3 \ MPI_Comm_delete_attr.3 \ + MPI_Comm_detach_buffer.3 \ MPI_Comm_disconnect.3 \ MPI_Comm_dup.3 \ MPI_Comm_dup_with_info.3 \ MPI_Comm_f2c.3 \ + MPI_Comm_flush_buffer.3 \ MPI_Comm_free.3 \ MPI_Comm_free_keyval.3 \ MPI_Comm_get_attr.3 \ @@ -153,6 +158,7 @@ OMPI_MAN3 = \ MPI_Comm_group.3 \ MPI_Comm_idup.3 \ MPI_Comm_idup_with_info.3 \ + MPI_Comm_iflush_buffer.3 \ MPI_Comm_join.3 \ MPI_Comm_rank.3 \ MPI_Comm_remote_group.3 \ @@ -402,13 +408,17 @@ OMPI_MAN3 = \ MPI_Send_init.3 \ MPI_Sendrecv.3 \ MPI_Sendrecv_replace.3 \ + MPI_Session_attach_buffer.3 \ MPI_Session_create_errhandler.3 \ + MPI_Session_detach_buffer.3 \ MPI_Session_f2c.3 \ MPI_Session_finalize.3 \ + MPI_Session_flush_buffer.3 \ MPI_Session_get_info.3 \ MPI_Session_get_nth_pset.3 \ MPI_Session_get_num_psets.3 \ MPI_Session_get_pset_info.3 \ + MPI_Session_iflush_buffer.3 \ MPI_Session_init.3 \ MPI_Sizeof.3 \ MPI_Ssend.3 \ diff --git a/docs/man-openmpi/man3/MPI_Buffer_attach.3.rst b/docs/man-openmpi/man3/MPI_Buffer_attach.3.rst index dfd42ed4957..7507f1c5812 100644 --- a/docs/man-openmpi/man3/MPI_Buffer_attach.3.rst +++ b/docs/man-openmpi/man3/MPI_Buffer_attach.3.rst @@ -14,7 +14,7 @@ INPUT PARAMETERS ---------------- * ``buf`` : Initial buffer address (choice). -* ``size`` : Buffer size, in bytes (integer). +* ``size`` : Buffer size, in bytes (integer). Ignored when buf is MPI_BUFFER_AUTOMATIC. OUTPUT PARAMETER ---------------- @@ -31,9 +31,15 @@ mode. Only one buffer can be attached to a process at a time. NOTES ----- -The size given should be the sum of the sizes of all outstanding Bsends -that you intend to have, plus MPI_BSEND_OVERHEAD bytes for each Bsend -that you do. For the purposes of calculating size, you should use +This buffer is to be used for buffering outgoing messages sent when using a +communicator to which no communicator specific buffer is attached or which +is derived from a session to which no session-specific buffer is attached +at the time the buffered mode send is started. + +When not using MPI_BUFFER_AUTOMATIC, the size given should be the sum +of the sizes of all outstanding Bsends that you intend to have, plus +MPI_BSEND_OVERHEAD bytes for each Bsend that you do. For the purposes +of calculating size, you should use :ref:`MPI_Pack_size`. In other words, in the code .. code-block:: c diff --git a/docs/man-openmpi/man3/MPI_Buffer_detach.3.rst b/docs/man-openmpi/man3/MPI_Buffer_detach.3.rst index 1955974288d..c2cf8e16692 100644 --- a/docs/man-openmpi/man3/MPI_Buffer_detach.3.rst +++ b/docs/man-openmpi/man3/MPI_Buffer_detach.3.rst @@ -15,7 +15,7 @@ OUTPUT PARAMETERS ----------------- * ``buf`` : Initial buffer address (choice). -* ``size`` : Buffer size, in bytes (integer). +* ``size`` : Buffer size, in bytes (integer). Undefined if MPI_BUFFER_AUTOMATIC was used when attaching. * ``ierror`` : Fortran only: Error status (integer). DESCRIPTION @@ -67,7 +67,7 @@ usages). For Fortran: The Fortran binding for this routine is different. Because Fortran does not have pointers, it is impossible to provide a way to use the output of this routine to exchange buffers. In this case, only the -size field is set. +size field is set. Note this does not apply when using the mpi_f08 bindings. For C: Even though the buf argument is declared as void, it is really the address of a void pointer. See Rationale, below, for more details. diff --git a/docs/man-openmpi/man3/MPI_Buffer_flush.3.rst b/docs/man-openmpi/man3/MPI_Buffer_flush.3.rst new file mode 100644 index 00000000000..8e41cf4098c --- /dev/null +++ b/docs/man-openmpi/man3/MPI_Buffer_flush.3.rst @@ -0,0 +1,42 @@ +.. _mpi_buffer_flush: + +MPI_Buffer_flush +================ + +.. include_body + +:ref:`MPI_Buffer_flush`, :ref:`MPI_Buffer_iflush` |mdash| Wait till all messages currently in +the the MPI process specific buffer of the calling MPI process have been transmitted. + +.. The following directive tells the man page generation script to + generate multiple bindings for this file. +.. mpi-bindings: MPI_Comm_flush_buffer, MPI_Comm_iflush_buffer + +.. The following file was automatically generated +.. include:: ./bindings/mpi_buffer_flush.rst + +INPUT PARAMETERS +---------------- + +OUTPUT PARAMETER +---------------- + +* ``ierror`` : Fortran only: Error status (integer). + +DESCRIPTION +----------- + +:ref:`MPI_Buffer_flush` will not return until all messages currently in the MPI process +specific buffer of the calling MPI process have been transmitted. The non-blocking variant +:ref:`MPI_Buffer_iflush` returns a MPI request that can be queried using MPI request +status functions to determine when all messages in the MPI process specific buffer of the +call MPI process have been transmitted. + +ERRORS +------ + +.. include:: ./ERRORS.rst + +.. seealso:: + * :ref:`MPI_Buffer_attach` + * :ref:`MPI_Buffer_detach` diff --git a/docs/man-openmpi/man3/MPI_Buffer_iflush.3.rst b/docs/man-openmpi/man3/MPI_Buffer_iflush.3.rst new file mode 100644 index 00000000000..cd3b707f881 --- /dev/null +++ b/docs/man-openmpi/man3/MPI_Buffer_iflush.3.rst @@ -0,0 +1,8 @@ +.. _mpi_buffer_iflush: + +MPI_Buffer_iflush +================= + .. include_body + +.. include:: ../man3/MPI_Buffer_flush.3.rst + :start-after: .. include_body diff --git a/docs/man-openmpi/man3/MPI_Comm_attach_buffer.3.rst b/docs/man-openmpi/man3/MPI_Comm_attach_buffer.3.rst new file mode 100644 index 00000000000..1e93ddeed05 --- /dev/null +++ b/docs/man-openmpi/man3/MPI_Comm_attach_buffer.3.rst @@ -0,0 +1,67 @@ +.. _mpi_comm_attach_buffer: + +MPI_Comm_attach_buffer +====================== + +.. include_body + +:ref:`MPI_Comm_attach_buffer` |mdash| Attaches a user-defined buffer to a communicator for sending. + +.. The following file was automatically generated +.. include:: ./bindings/mpi_comm_attach_buffer.rst + +INPUT PARAMETERS +---------------- + +* ``comm``: Communicator (handle). +* ``buf`` : Initial buffer address (choice). +* ``size`` : Buffer size, in bytes (integer). Ignored if buf is MPI_BUFFER_AUTOMATIC. + +OUTPUT PARAMETER +---------------- + +* ``ierror`` : Fortran only: Error status (integer). + +DESCRIPTION +----------- + +Provides to MPI a buffer to attach to a communicator to be used for buffering +outgoing messages. The buffer is used only by messages sent in buffered +mode using the supplied communicator. Only one buffer can be attached to a communicator at a time. + +NOTES +----- + +When not using MPI_BUFFER_AUTOMATIC, the size given should be the sum +of the sizes of all outstanding Bsends that you intend to have, +plus MPI_BSEND_OVERHEAD bytes for each Bsend that you do. For the purposes of +calculating size, you should use :ref:`MPI_Pack_size`. In other words, in the code + +.. code-block:: c + + MPI_Comm_attach_buffer(MPI_COMM_WORLD, buf, size ) + MPI_Bsend( ..., count=20, datatype=type1, ... ); + MPI_Bsend( ..., count=40, datatype=type2, ... ); + +the value of size in the :ref:`MPI_Comm_attach_buffer` call should be greater than +the value computed by + +.. code-block:: c + + MPI_Pack_size( 20, type1, comm, &s1 ); + MPI_Pack_size( 40, type2, comm, &s2 ); + size = s1 + s2 + 2 * MPI_BSEND_OVERHEAD; + +MPI_BSEND_OVERHEAD gives the maximum amount of buffer space that may be +used by the Bsend routines. This value is in mpi.h for C and mpif.h for +Fortran. + +ERRORS +------ + +.. include:: ./ERRORS.rst + +.. seealso:: + * :ref:`MPI_Comm_detach_buffer` + * :ref:`MPI_Session_attach_buffer` + * :ref:`MPI_Buffer_attach` diff --git a/docs/man-openmpi/man3/MPI_Comm_detach_buffer.3.rst b/docs/man-openmpi/man3/MPI_Comm_detach_buffer.3.rst new file mode 100644 index 00000000000..e61624213d1 --- /dev/null +++ b/docs/man-openmpi/man3/MPI_Comm_detach_buffer.3.rst @@ -0,0 +1,88 @@ +.. _mpi_comm_detach_buffer: + +MPI_Comm_detach_buffer +====================== + +.. include_body + +:ref:`MPI_Comm_detach_buffer` |mdash| Removes an existing buffer previously attached to the communicator(for use in in :ref:`MPI_Bsend`, +etc.) + +.. The following file was automatically generated +.. include:: ./bindings/mpi_comm_detach_buffer.rst + +OUTPUT PARAMETERS +----------------- + +* ``comm``: Communicator (handle). +* ``buf`` : Initial buffer address (choice). +* ``size`` : Buffer size, in bytes (integer). Undefined if MPI_BUFFER_AUTOMATIC was used when attaching. +* ``ierror`` : Fortran only: Error status (integer). + +DESCRIPTION +----------- + +Detach the buffer currently associated with the communicator. The call returns the +address and the size of the detached buffer. This operation will block +until all messages currently in the buffer have been transmitted. Upon +return of this function, the user may reuse or deallocate the space +taken by the buffer. + +Example: Calls to attach and detach buffers. + +.. code-block:: c + + #define BUFFSIZE 10000 + + int size char *buff; + MPI_Comm_attach_buffer(MPI_COMM_WORLD, malloc(BUFFSIZE), BUFFSIZE); + + // a buffer of 10000 bytes can now be used by MPI_Bsend + + MPI_Comm_detach_buffer(MPI_COMM_WORLD, &buff, &size); // Buffer size reduced to zero + MPI_Comm_attach_buffer(MPI_COMM_WORLD, buff, size); // Buffer of 10000 bytes available again + +NOTES +----- + +The reason that :ref:`MPI_Comm_detach_buffer` returns the address and size of the +buffer being detached is to allow nested libraries to replace and +restore the buffer. For example, consider + +.. code-block:: c + + int size, mysize, idummy; + void *ptr, *myptr, *dummy; + MPI_Comm_detach_buffer(MPI_COMM_WORLD, &ptr, &size ); + MPI_Comm_attach_buffer(MPI_COMM_WORLD, myptr, mysize ); + + /* ... library code ... */ + + MPI_Comm_detach_buffer(MPI_COMM_WORLD, &dummy, &idummy ); + MPI_Comm_attach_buffer(MPI_COMM_WORLD, ptr, size ); + +This is much like the action of the UNIX signal routine and has the same +strengths (it's simple) and weak‐nesses (it only works for nested +usages). + +For Fortran: The Fortran binding for this routine is different. Because +Fortran does not have pointers, it is impossible to provide a way to use +the output of this routine to exchange buffers. In this case, only the +size field is set. Note this does not apply when using the mpi_f08 bindings. + +For C: Even though the buf argument is declared as void, it is really +the address of a void pointer. See Rationale, below, for more details. + +Even though the C functions :ref:`MPI_Comm_attach_buffer` and :ref:`MPI_Comm_detach_buffer` both +have an argument of type void*, these arguments are used +differently: A pointer to the buffer is passed to :ref:`MPI_Comm_attach_buffer`; the +address of the pointer is passed to :ref:`MPI_Comm_detach_buffer`, so that this call +can return the pointer value. + +ERRORS +------ + +.. include:: ./ERRORS.rst + +.. seealso:: + * :ref:`MPI_Comm_attach_buffer` diff --git a/docs/man-openmpi/man3/MPI_Comm_flush_buffer.3.rst b/docs/man-openmpi/man3/MPI_Comm_flush_buffer.3.rst new file mode 100644 index 00000000000..02f8e1b1577 --- /dev/null +++ b/docs/man-openmpi/man3/MPI_Comm_flush_buffer.3.rst @@ -0,0 +1,44 @@ +.. _mpi_comm_flush_buffer: + +MPI_Comm_flush_buffer +====================== + +.. include_body + +:ref:`MPI_Comm_flush_buffer`, :ref:`MPI_Comm_iflush_buffer` |mdash| Wait till all messages currently in +the communicator-specific buffer of the calling MPI process have been transmitted. + +.. The following directive tells the man page generation script to + generate multiple bindings for this file. +.. mpi-bindings: MPI_Comm_flush_buffer, MPI_Comm_iflush_buffer + +.. The following file was automatically generated +.. include:: ./bindings/mpi_comm_flush_buffer.rst + +INPUT PARAMETERS +---------------- + +* ``comm``: Communicator (handle). + +OUTPUT PARAMETER +---------------- + +* ``ierror`` : Fortran only: Error status (integer). + +DESCRIPTION +----------- + +:ref:`MPI_Comm_flush_buffer` will not return until all messages currently in the communicator- +specific buffer of the calling MPI process have been transmitted. The non-blocking variant +:ref:`MPI_Comm_iflush_buffer` returns a MPI request that can be queried using MPI request +status functions to determine when all messages in the communicator-specific buffer of the +call MPI process have been transmitted. + +ERRORS +------ + +.. include:: ./ERRORS.rst + +.. seealso:: + * :ref:`MPI_Comm_attach_buffer` + * :ref:`MPI_Comm_detach_buffer` diff --git a/docs/man-openmpi/man3/MPI_Comm_iflush_buffer.3.rst b/docs/man-openmpi/man3/MPI_Comm_iflush_buffer.3.rst new file mode 100644 index 00000000000..c7bdb2c66d4 --- /dev/null +++ b/docs/man-openmpi/man3/MPI_Comm_iflush_buffer.3.rst @@ -0,0 +1,8 @@ +.. _mpi_comm_iflush_buffer: + +MPI_Comm_iflush_buffer +====================== + .. include_body + +.. include:: ../man3/MPI_Comm_flush_buffer.3.rst + :start-after: .. include_body diff --git a/docs/man-openmpi/man3/MPI_Session_attach_buffer.3.rst b/docs/man-openmpi/man3/MPI_Session_attach_buffer.3.rst new file mode 100644 index 00000000000..ad066be3fd9 --- /dev/null +++ b/docs/man-openmpi/man3/MPI_Session_attach_buffer.3.rst @@ -0,0 +1,72 @@ +.. _mpi_session_attach_buffer: + +MPI_Session_attach_buffer +========================= + +.. include_body + +:ref:`MPI_Session_attach_buffer` |mdash| Attaches a user-defined buffer to a session for sending. + +.. The following file was automatically generated +.. include:: ./bindings/mpi_session_attach_buffer.rst + +INPUT PARAMETERS +---------------- + +* ``session``: Session (handle). +* ``buf`` : Initial buffer address (choice). +* ``size`` : Buffer size, in bytes (integer). Ignored if buf is MPI_BUFFER_AUTOMATIC. + +OUTPUT PARAMETER +---------------- + +* ``ierror`` : Fortran only: Error status (integer). + +DESCRIPTION +----------- + +Provides to MPI a buffer to attach to a session to be used for buffering +outgoing messages. The buffer is used only by messages sent in buffered +mode using the supplied session. Only one buffer can be attached to a session at a time. + +NOTES +----- + +This buffer is to be used for buffering outgoing messages sent when using a communicator +that is created from a group that is derived from the session session. However, if there +is a communicator-specific buffer attached to the particular communicator at the time +of the buffered mode send is started, that buffer is used. + +When not using MPI_BUFFER_AUTOMATIC, the size given should be the sum +of the sizes of all outstanding Bsends that you intend to have, +plus MPI_BSEND_OVERHEAD bytes for each Bsend that you do. For the purposes of +calculating size, you should use :ref:`MPI_Pack_size`. In other words, in the code + +.. code-block:: c + + MPI_Session_attach_buffer(session, buf, size ) + MPI_Bsend( ..., count=20, datatype=type1, ... ); + MPI_Bsend( ..., count=40, datatype=type2, ... ); + +the value of size in the :ref:`MPI_Session_attach_buffer` call should be greater than +the value computed by + +.. code-block:: c + + MPI_Pack_size( 20, type1, comm, &s1 ); + MPI_Pack_size( 40, type2, comm, &s2 ); + size = s1 + s2 + 2 * MPI_BSEND_OVERHEAD; + +MPI_BSEND_OVERHEAD gives the maximum amount of buffer space that may be +used by the Bsend routines. This value is in mpi.h for C and mpif.h for +Fortran. + +ERRORS +------ + +.. include:: ./ERRORS.rst + +.. seealso:: + * :ref:`MPI_Session_detach_buffer` + * :ref:`MPI_Comm_attach_buffer` + * :ref:`MPI_Buffer_attach` diff --git a/docs/man-openmpi/man3/MPI_Session_detach_buffer.3.rst b/docs/man-openmpi/man3/MPI_Session_detach_buffer.3.rst new file mode 100644 index 00000000000..71088efc5fe --- /dev/null +++ b/docs/man-openmpi/man3/MPI_Session_detach_buffer.3.rst @@ -0,0 +1,88 @@ +.. _mpi_session_detach_buffer: + +MPI_Session_detach_buffer +========================= + +.. include_body + +:ref:`MPI_Session_detach_buffer` |mdash| Removes an existing buffer previously attached to the session(for use in in :ref:`MPI_Bsend`, +etc.) + +.. The following file was automatically generated +.. include:: ./bindings/mpi_session_detach_buffer.rst + +OUTPUT PARAMETERS +----------------- + +* ``session``: Session (handle). +* ``buf`` : Initial buffer address (choice). +* ``size`` : Buffer size, in bytes (integer). Undefined if MPI_BUFFER_AUTOMATIC was used when attaching. +* ``ierror`` : Fortran only: Error status (integer). + +DESCRIPTION +----------- + +Detach the buffer currently associated with the session. The call returns the +address and the size of the detached buffer. This operation will block +until all messages currently in the buffer have been transmitted. Upon +return of this function, the user may reuse or deallocate the space +taken by the buffer. + +Example: Calls to attach and detach buffers. + +.. code-block:: c + + #define BUFFSIZE 10000 + + int size char *buff; + MPI_Session_attach_buffer(session, malloc(BUFFSIZE), BUFFSIZE); + + // a buffer of 10000 bytes can now be used by MPI_Bsend + + MPI_Session_detach_buffer(session, &buff, &size); // Buffer size reduced to zero + MPI_Session_attach_buffer(session, buff, size); // Buffer of 10000 bytes available again + +NOTES +----- + +The reason that :ref:`MPI_Session_detach_buffer` returns the address and size of the +buffer being detached is to allow nested libraries to replace and +restore the buffer. For example, consider + +.. code-block:: c + + int size, mysize, idummy; + void *ptr, *myptr, *dummy; + MPI_Session_detach_buffer(session, &ptr, &size ); + MPI_Session_attach_buffer(session, myptr, mysize ); + + /* ... library code ... */ + + MPI_Session_detach_buffer(session, &dummy, &idummy ); + MPI_Session_attach_buffer(session, ptr, size ); + +This is much like the action of the UNIX signal routine and has the same +strengths (it's simple) and weak‐nesses (it only works for nested +usages). + +For Fortran: The Fortran binding for this routine is different. Because +Fortran does not have pointers, it is impossible to provide a way to use +the output of this routine to exchange buffers. In this case, only the +size field is set. Note this does not apply when using the mpi_f08 bindings. + +For C: Even though the buf argument is declared as void, it is really +the address of a void pointer. See Rationale, below, for more details. + +Even though the C functions :ref:`MPI_Session_attach_buffer` and :ref:`MPI_Session_detach_buffer` both +have an argument of type void*, these arguments are used +differently: A pointer to the buffer is passed to :ref:`MPI_Session_attach_buffer`; the +address of the pointer is passed to :ref:`MPI_Session_detach_buffer`, so that this call +can return the pointer value. + +ERRORS +------ + +.. include:: ./ERRORS.rst + +.. seealso:: + * :ref:`MPI_Session_attach_buffer` diff --git a/docs/man-openmpi/man3/MPI_Session_flush_buffer.3.rst b/docs/man-openmpi/man3/MPI_Session_flush_buffer.3.rst new file mode 100644 index 00000000000..5cd2c049edc --- /dev/null +++ b/docs/man-openmpi/man3/MPI_Session_flush_buffer.3.rst @@ -0,0 +1,44 @@ +.. _mpi_session_flush_buffer: + +MPI_Session_flush_buffer +======================== + +.. include_body + +:ref:`MPI_Session_flush_buffer`, :ref:`MPI_Session_iflush_buffer` |mdash| Wait till all messages currently in the session-specific +buffer of the calling MPI process have been transmitted. + +.. The following directive tells the man page generation script to + generate multiple bindings for this file. +.. mpi-bindings: MPI_Session_flush_buffer, MPI_Session_iflush_buffer + +.. The following file was automatically generated +.. include:: ./bindings/mpi_session_flush_buffer.rst + +INPUT PARAMETERS +---------------- + +* ``session``: Session (handle). + +OUTPUT PARAMETER +---------------- + +* ``ierror`` : Fortran only: Error status (integer). + +DESCRIPTION +----------- + +:ref:`MPI_Session_flush_buffer` will not return until all messages currently in the session-specific +buffer of the calling MPI process have been transmitted. The non-blocking variant +:ref:`MPI_Session_iflush_buffer` returns a MPI request that can be queried using MPI request +status functions to determine when all messages in the sessionunicator-specific buffer of the +call MPI process have been transmitted. + +ERRORS +------ + +.. include:: ./ERRORS.rst + +.. seealso:: + * :ref:`MPI_Session_attach_buffer` + * :ref:`MPI_Session_detach_buffer` diff --git a/docs/man-openmpi/man3/MPI_Session_iflush_buffer.3.rst b/docs/man-openmpi/man3/MPI_Session_iflush_buffer.3.rst new file mode 100644 index 00000000000..725cdf2d1d2 --- /dev/null +++ b/docs/man-openmpi/man3/MPI_Session_iflush_buffer.3.rst @@ -0,0 +1,8 @@ +.. _mpi_session_iflush_buffer: + +MPI_Session_iflush_buffer +========================= + .. include_body + +.. include:: ../man3/MPI_Session_flush_buffer.3.rst + :start-after: .. include_body diff --git a/docs/man-openmpi/man3/index.rst b/docs/man-openmpi/man3/index.rst index 8b880c33967..695f8c9024f 100644 --- a/docs/man-openmpi/man3/index.rst +++ b/docs/man-openmpi/man3/index.rst @@ -36,6 +36,8 @@ MPI API manual pages (section 3) MPI_Bsend_init.3.rst MPI_Buffer_attach.3.rst MPI_Buffer_detach.3.rst + MPI_Buffer_flush.3.rst + MPI_Buffer_iflush.3.rst MPI_Cancel.3.rst MPI_Cart_coords.3.rst MPI_Cart_create.3.rst @@ -47,6 +49,7 @@ MPI API manual pages (section 3) MPI_Cartdim_get.3.rst MPI_Close_port.3.rst MPI_Comm_accept.3.rst + MPI_Comm_attach_buffer.3.rst MPI_Comm_c2f.3.rst MPI_Comm_call_errhandler.3.rst MPI_Comm_compare.3.rst @@ -57,10 +60,12 @@ MPI API manual pages (section 3) MPI_Comm_create_group.3.rst MPI_Comm_create_keyval.3.rst MPI_Comm_delete_attr.3.rst + MPI_Comm_detach_buffer.3.rst MPI_Comm_disconnect.3.rst MPI_Comm_dup.3.rst MPI_Comm_dup_with_info.3.rst MPI_Comm_f2c.3.rst + MPI_Comm_flush_buffer.3.rst MPI_Comm_free.3.rst MPI_Comm_free_keyval.3.rst MPI_Comm_get_attr.3.rst @@ -71,6 +76,7 @@ MPI API manual pages (section 3) MPI_Comm_group.3.rst MPI_Comm_idup.3.rst MPI_Comm_idup_with_info.3.rst + MPI_Comm_iflush_buffer.3.rst MPI_Comm_join.3.rst MPI_Comm_rank.3.rst MPI_Comm_remote_group.3.rst @@ -324,16 +330,20 @@ MPI API manual pages (section 3) MPI_Send_init.3.rst MPI_Sendrecv.3.rst MPI_Sendrecv_replace.3.rst + MPI_Session_attach_buffer.3.rst MPI_Session_c2f.3.rst MPI_Session_call_errhandler.3.rst MPI_Session_create_errhandler.3.rst + MPI_Session_detach_buffer.3.rst MPI_Session_f2c.3.rst MPI_Session_finalize.3.rst + MPI_Session_flush_buffer.3.rst MPI_Session_get_errhandler.3.rst MPI_Session_get_info.3.rst MPI_Session_get_nth_pset.3.rst MPI_Session_get_num_psets.3.rst MPI_Session_get_pset_info.3.rst + MPI_Session_iflush_buffer.3.rst MPI_Session_init.3.rst MPI_Session_set_errhandler.3.rst MPI_Sizeof.3.rst From 95ce1d3c47f13cd7e4a9fd2e14ad7e6f7c1a3d41 Mon Sep 17 00:00:00 2001 From: Howard Pritchard Date: Wed, 23 Jul 2025 09:34:05 -0600 Subject: [PATCH 4/4] man page fix Signed-off-by: Howard Pritchard --- docs/man-openmpi/man3/MPI_Buffer_flush.3.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/man-openmpi/man3/MPI_Buffer_flush.3.rst b/docs/man-openmpi/man3/MPI_Buffer_flush.3.rst index 8e41cf4098c..e712479baac 100644 --- a/docs/man-openmpi/man3/MPI_Buffer_flush.3.rst +++ b/docs/man-openmpi/man3/MPI_Buffer_flush.3.rst @@ -10,7 +10,7 @@ the the MPI process specific buffer of the calling MPI process have been transmi .. The following directive tells the man page generation script to generate multiple bindings for this file. -.. mpi-bindings: MPI_Comm_flush_buffer, MPI_Comm_iflush_buffer +.. mpi-bindings: MPI_Buffer_flush, MPI_Buffer_iflush .. The following file was automatically generated .. include:: ./bindings/mpi_buffer_flush.rst