Skip to content

support for mpi_comm_attach_buffer and friends #13338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion ompi/communicator/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2018-2024 Triad National Security, LLC. All rights
* Copyright (c) 2018-2025 Triad National Security, LLC. All rights
* reserved.
* Copyright (c) 2023 Advanced Micro Devices, Inc. All rights reserved.
* Copyright (c) 2024 NVIDIA Corporation. All rights reserved.
Expand Down Expand Up @@ -345,6 +345,9 @@ struct ompi_communicator_t {
/* instance that this comm belongs to */
ompi_instance_t* instance;

/* pointer to buffer object used for buffered sends */
void *bsend_buffer;

#if OPAL_ENABLE_FT_MPI
/** agreement caching info for topology and previous returned decisions */
opal_object_t *agreement_specific;
Expand Down Expand Up @@ -775,6 +778,18 @@ static inline bool ompi_comm_iface_create_check(ompi_communicator_t *comm, int *
return ompi_comm_iface_coll_check(comm, err);
}

static inline void *ompi_comm_bsend_buffer_get(ompi_communicator_t *comm)
{
assert(NULL != comm);
return comm->bsend_buffer;
}

static inline int ompi_comm_bsend_buffer_set(ompi_communicator_t *comm, void *buffer)
{
comm->bsend_buffer = buffer;
return OMPI_SUCCESS;
}

/*
* Communicator creation support collectives
* - Agreement style allreduce
Expand Down
29 changes: 29 additions & 0 deletions ompi/include/mpi.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ typedef MPI_Win_errhandler_function MPI_Win_errhandler_fn
#define MPI_WEIGHTS_EMPTY ((int *) 3) /* empty weights */
#define MPI_BOTTOM ((void *) 0) /* base reference address */
#define MPI_IN_PLACE ((void *) 1) /* in place buffer */
#define MPI_BUFFER_AUTOMATIC ((void *) 4) /* MPI_BUFFER_AUTOMATIC for buffer attach funcs */
#define MPI_BSEND_OVERHEAD 128 /* size of bsend header + ptr */
#define MPI_MAX_INFO_KEY OPAL_MAX_INFO_KEY /* max info key length */
#define MPI_MAX_INFO_VAL OPAL_MAX_INFO_VAL /* max info value length */
Expand Down Expand Up @@ -1574,6 +1575,8 @@ OMPI_DECLSPEC int MPI_Buffer_attach(void *buffer, int size);
OMPI_DECLSPEC int MPI_Buffer_attach_c(void *buffer, MPI_Count size);
OMPI_DECLSPEC int MPI_Buffer_detach(void *buffer, int *size);
OMPI_DECLSPEC int MPI_Buffer_detach_c(void *buffer, MPI_Count *size);
OMPI_DECLSPEC int MPI_Buffer_flush(void);
OMPI_DECLSPEC int MPI_Buffer_iflush(MPI_Request *request);
OMPI_DECLSPEC int MPI_Cancel(MPI_Request *request);
OMPI_DECLSPEC int MPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int coords[]);
OMPI_DECLSPEC int MPI_Cart_create(MPI_Comm old_comm, int ndims, const int dims[],
Expand Down Expand Up @@ -1637,11 +1640,17 @@ OMPI_DECLSPEC int MPI_Dist_graph_neighbors_count(MPI_Comm comm,
int *inneighbors,
int *outneighbors,
int *weighted);
OMPI_DECLSPEC int MPI_Comm_attach_buffer(MPI_Comm comm, void *buffer, int size);
OMPI_DECLSPEC int MPI_Comm_attach_buffer_c(MPI_Comm comm, void *buffer, MPI_Count size);
OMPI_DECLSPEC int MPI_Comm_detach_buffer(MPI_Comm comm, void *buffer_addr, int *size);
OMPI_DECLSPEC int MPI_Comm_detach_buffer_c(MPI_Comm comm, void *buffer_addr, MPI_Count *size);
OMPI_DECLSPEC int MPI_Comm_flush_buffer(MPI_Comm comm);
OMPI_DECLSPEC int MPI_Comm_get_errhandler(MPI_Comm comm, MPI_Errhandler *erhandler);
OMPI_DECLSPEC int MPI_Comm_get_info(MPI_Comm comm, MPI_Info *info_used);
OMPI_DECLSPEC int MPI_Comm_get_name(MPI_Comm comm, char *comm_name, int *resultlen);
OMPI_DECLSPEC int MPI_Comm_get_parent(MPI_Comm *parent);
OMPI_DECLSPEC int MPI_Comm_group(MPI_Comm comm, MPI_Group *group);
OMPI_DECLSPEC int MPI_Comm_iflush_buffer(MPI_Comm comm, MPI_Request *request);
OMPI_DECLSPEC int MPI_Comm_join(int fd, MPI_Comm *intercomm);
OMPI_DECLSPEC int MPI_Comm_rank(MPI_Comm comm, int *rank);
OMPI_DECLSPEC int MPI_Comm_remote_group(MPI_Comm comm, MPI_Group *group);
Expand Down Expand Up @@ -2331,15 +2340,21 @@ OMPI_DECLSPEC int MPI_Sendrecv_replace_c(void * buf, MPI_Count count, MPI_Datat
int dest, int sendtag, int source, int recvtag,
MPI_Comm comm, MPI_Status *status);
OMPI_DECLSPEC MPI_Fint MPI_Session_c2f (const MPI_Session session);
OMPI_DECLSPEC int MPI_Session_attach_buffer(MPI_Session session, void *buffer, int size);
OMPI_DECLSPEC int MPI_Session_attach_buffer_c(MPI_Session session, void *buffer, MPI_Count size);
OMPI_DECLSPEC int MPI_Session_call_errhandler(MPI_Session session, int errorcode);
OMPI_DECLSPEC int MPI_Session_create_errhandler (MPI_Session_errhandler_function *session_errhandler_fn,
MPI_Errhandler *errhandler);
OMPI_DECLSPEC int MPI_Session_detach_buffer(MPI_Session session, void *buffer_addr, int *size);
OMPI_DECLSPEC int MPI_Session_detach_buffer_c(MPI_Session session, void *buffer_addr, MPI_Count *size);
OMPI_DECLSPEC int MPI_Session_finalize (MPI_Session *session);
OMPI_DECLSPEC int MPI_Session_flush_buffer(MPI_Session session);
OMPI_DECLSPEC int MPI_Session_get_errhandler(MPI_Session session, MPI_Errhandler *erhandler);
OMPI_DECLSPEC int MPI_Session_get_info (MPI_Session session, MPI_Info *info_used);
OMPI_DECLSPEC int MPI_Session_get_num_psets (MPI_Session session, MPI_Info info, int *npset_names);
OMPI_DECLSPEC int MPI_Session_get_nth_pset (MPI_Session session, MPI_Info info, int n, int *len, char *pset_name);
OMPI_DECLSPEC int MPI_Session_get_pset_info (MPI_Session session, const char *pset_name, MPI_Info *info_used);
OMPI_DECLSPEC int MPI_Session_iflush_buffer(MPI_Session session, MPI_Request *request);
OMPI_DECLSPEC int MPI_Session_init (MPI_Info info, MPI_Errhandler errhandler,
MPI_Session *session);
OMPI_DECLSPEC MPI_Session MPI_Session_f2c (MPI_Fint session);
Expand Down Expand Up @@ -2733,6 +2748,8 @@ OMPI_DECLSPEC int PMPI_Buffer_attach(void *buffer, int size);
OMPI_DECLSPEC int PMPI_Buffer_attach_c(void *buffer, MPI_Count size);
OMPI_DECLSPEC int PMPI_Buffer_detach(void *buffer, int *size);
OMPI_DECLSPEC int PMPI_Buffer_detach_c(void *buffer, MPI_Count *size);
OMPI_DECLSPEC int PMPI_Buffer_flush(void);
OMPI_DECLSPEC int PMPI_Buffer_iflush(MPI_Request *request);
OMPI_DECLSPEC int PMPI_Cancel(MPI_Request *request);
OMPI_DECLSPEC int PMPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int coords[]);
OMPI_DECLSPEC int PMPI_Cart_create(MPI_Comm old_comm, int ndims, const int dims[],
Expand Down Expand Up @@ -2796,11 +2813,17 @@ OMPI_DECLSPEC int PMPI_Dist_graph_neighbors_count(MPI_Comm comm,
int *inneighbors,
int *outneighbors,
int *weighted);
OMPI_DECLSPEC int PMPI_Comm_attach_buffer(MPI_Comm comm, void *buffer, int size);
OMPI_DECLSPEC int PMPI_Comm_attach_buffer_c(MPI_Comm comm, void *buffer, MPI_Count size);
OMPI_DECLSPEC int PMPI_Comm_detach_buffer(MPI_Comm comm, void *buffer_addr, int *size);
OMPI_DECLSPEC int PMPI_Comm_detach_buffer_c(MPI_Comm comm, void *buffer_addr, MPI_Count *size);
OMPI_DECLSPEC int PMPI_Comm_flush_buffer(MPI_Comm comm);
OMPI_DECLSPEC int PMPI_Comm_get_errhandler(MPI_Comm comm, MPI_Errhandler *erhandler);
OMPI_DECLSPEC int PMPI_Comm_get_info(MPI_Comm comm, MPI_Info *info_used);
OMPI_DECLSPEC int PMPI_Comm_get_name(MPI_Comm comm, char *comm_name, int *resultlen);
OMPI_DECLSPEC int PMPI_Comm_get_parent(MPI_Comm *parent);
OMPI_DECLSPEC int PMPI_Comm_group(MPI_Comm comm, MPI_Group *group);
OMPI_DECLSPEC int PMPI_Comm_iflush_buffer(MPI_Comm comm, MPI_Request *request);
OMPI_DECLSPEC int PMPI_Comm_join(int fd, MPI_Comm *intercomm);
OMPI_DECLSPEC int PMPI_Comm_rank(MPI_Comm comm, int *rank);
OMPI_DECLSPEC int PMPI_Comm_remote_group(MPI_Comm comm, MPI_Group *group);
Expand Down Expand Up @@ -3490,15 +3513,21 @@ OMPI_DECLSPEC int PMPI_Sendrecv_replace_c(void * buf, MPI_Count count, MPI_Data
int dest, int sendtag, int source, int recvtag,
MPI_Comm comm, MPI_Status *status);
OMPI_DECLSPEC MPI_Fint PMPI_Session_c2f (const MPI_Session session);
OMPI_DECLSPEC int PMPI_Session_attach_buffer(MPI_Session session, void *buffer, int size);
OMPI_DECLSPEC int PMPI_Session_attach_buffer_c(MPI_Session session, void *buffer, MPI_Count size);
OMPI_DECLSPEC int PMPI_Session_call_errhandler(MPI_Session session, int errorcode);
OMPI_DECLSPEC int PMPI_Session_create_errhandler (MPI_Session_errhandler_function *session_errhandler_fn,
MPI_Errhandler *errhandler);
OMPI_DECLSPEC int PMPI_Session_detach_buffer(MPI_Session session, void *buffer_addr, int *size);
OMPI_DECLSPEC int PMPI_Session_detach_buffer_c(MPI_Session session, void *buffer_addr, MPI_Count *size);
OMPI_DECLSPEC int PMPI_Session_finalize (MPI_Session *session);
OMPI_DECLSPEC int PMPI_Session_flush_buffer(MPI_Session session);
OMPI_DECLSPEC int PMPI_Session_get_errhandler(MPI_Session session, MPI_Errhandler *erhandler);
OMPI_DECLSPEC int PMPI_Session_get_info (MPI_Session session, MPI_Info *info_used);
OMPI_DECLSPEC int PMPI_Session_get_num_psets (MPI_Session session, MPI_Info info, int *npset_names);
OMPI_DECLSPEC int PMPI_Session_get_nth_pset (MPI_Session session, MPI_Info info, int n, int *len, char *pset_name);
OMPI_DECLSPEC int PMPI_Session_get_pset_info (MPI_Session session, const char *pset_name, MPI_Info *info_used);
OMPI_DECLSPEC int PMPI_Session_iflush_buffer(MPI_Session session, MPI_Request *request);
OMPI_DECLSPEC int PMPI_Session_init (MPI_Info info, MPI_Errhandler errhandler,
MPI_Session *session);
OMPI_DECLSPEC MPI_Session PMPI_Session_f2c (MPI_Fint session);
Expand Down
2 changes: 1 addition & 1 deletion ompi/instance/instance.c
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ static int ompi_mpi_instance_finalize_common (void)
/* As finalize is the last legal MPI call, we are allowed to force the release
* of the user buffer used for bsend, before going anywhere further.
*/
(void) mca_pml_base_bsend_detach (NULL, NULL);
(void) mca_pml_base_bsend_detach(BASE_BSEND_BUF, NULL, NULL, NULL);

/* Shut down any bindings-specific issues: C++, F77, F90 */

Expand Down
18 changes: 17 additions & 1 deletion ompi/instance/instance.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2018 Triad National Security, LLC. All rights reserved.
* Copyright (c) 2018-2025 Triad National Security, LLC. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -41,6 +41,10 @@ struct ompi_instance_t {

ompi_errhandler_t *error_handler;
ompi_errhandler_type_t errhandler_type;

/* pointer to buffer object used for buffered sends */
void *bsend_buffer;

};

typedef struct ompi_instance_t ompi_instance_t;
Expand Down Expand Up @@ -164,4 +168,16 @@ static inline int ompi_instance_invalid (const ompi_instance_t* instance)
return false;
}

static inline void *ompi_instance_bsend_buffer_get(ompi_instance_t *instance)
{
assert(NULL != instance);
return instance->bsend_buffer;
}

static inline int ompi_instance_bsend_buffer_set(ompi_instance_t *instance, void *buffer)
{
instance->bsend_buffer = buffer;
return OMPI_SUCCESS;
}

#endif /* !defined(OMPI_INSTANCE_H) */
Loading
Loading