Skip to content

Commit 4116ae3

Browse files
Performance fix for FieldsIO (#538)
* Performance fixes for FieldsIO * Added missing test markers * Reverting to non-collective IO if needed * Doing as much collective IO as possible also with unbalanced distributions * Small cleanup * TL: debug the full collective approach --------- Co-authored-by: Thibaut Lunet <[email protected]>
1 parent f4f4df9 commit 4116ae3

File tree

2 files changed

+57
-22
lines changed

2 files changed

+57
-22
lines changed

pySDC/helpers/fieldsIO.py

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,7 @@
4545
Warning
4646
-------
4747
To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring).
48-
Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, wether the code is run in parallel or not.
49-
50-
> ⚠️ Also : this module can only be imported with **Python 3.11 or higher** !
48+
Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, whether the code is run in parallel or not.
5149
"""
5250
import os
5351
import numpy as np
@@ -202,7 +200,7 @@ def initialize(self):
202200
if not self.ALLOW_OVERWRITE:
203201
assert not os.path.isfile(
204202
self.fileName
205-
), "file already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting"
203+
), f"file {self.fileName!r} already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting"
206204

207205
with open(self.fileName, "w+b") as f:
208206
self.hBase.tofile(f)
@@ -475,7 +473,7 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):
475473
476474
Example
477475
-------
478-
>>> # Suppose the FieldsIO object is already writen into outputs.pysdc
476+
>>> # Suppose the FieldsIO object is already written into outputs.pysdc
479477
>>> import os
480478
>>> from pySDC.utils.fieldsIO import Rectilinear
481479
>>> os.makedirs("vtrFiles") # to store all VTR files into a subfolder
@@ -494,12 +492,13 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):
494492
# MPI-parallel implementation
495493
# -------------------------------------------------------------------------
496494
comm: MPI.Intracomm = None
495+
_nCollectiveIO = None
497496

498497
@classmethod
499498
def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
500499
"""
501500
Setup the MPI mode for the files IO, considering a decomposition
502-
of the 1D grid into contiuous subintervals.
501+
of the 1D grid into contiguous subintervals.
503502
504503
Parameters
505504
----------
@@ -514,6 +513,20 @@ def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
514513
cls.iLoc = iLoc
515514
cls.nLoc = nLoc
516515
cls.mpiFile = None
516+
cls._nCollectiveIO = None
517+
518+
@property
519+
def nCollectiveIO(self):
520+
"""
521+
Number of collective IO operations over all processes, when reading or writing a field.
522+
523+
Returns:
524+
--------
525+
int: Number of collective IO accesses
526+
"""
527+
if self._nCollectiveIO is None:
528+
self._nCollectiveIO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MAX)
529+
return self._nCollectiveIO
517530

518531
@property
519532
def MPI_ON(self):
@@ -541,7 +554,7 @@ def MPI_WRITE(self, data):
541554
"""Write data (np.ndarray) in the binary file in MPI mode, at the current file cursor position."""
542555
self.mpiFile.Write(data)
543556

544-
def MPI_WRITE_AT(self, offset, data: np.ndarray):
557+
def MPI_WRITE_AT_ALL(self, offset, data: np.ndarray):
545558
"""
546559
Write data in the binary file in MPI mode, with a given offset
547560
**relative to the beginning of the file**.
@@ -553,9 +566,9 @@ def MPI_WRITE_AT(self, offset, data: np.ndarray):
553566
data : np.ndarray
554567
Data to be written in the binary file.
555568
"""
556-
self.mpiFile.Write_at(offset, data)
569+
self.mpiFile.Write_at_all(offset, data)
557570

558-
def MPI_READ_AT(self, offset, data):
571+
def MPI_READ_AT_ALL(self, offset, data: np.ndarray):
559572
"""
560573
Read data from the binary file in MPI mode, with a given offset
561574
**relative to the beginning of the file**.
@@ -567,7 +580,7 @@ def MPI_READ_AT(self, offset, data):
567580
data : np.ndarray
568581
Array on which to read the data from the binary file.
569582
"""
570-
self.mpiFile.Read_at(offset, data)
583+
self.mpiFile.Read_at_all(offset, data)
571584

572585
def MPI_FILE_CLOSE(self):
573586
"""Close the binary file in MPI mode"""
@@ -620,13 +633,22 @@ def addField(self, time, field):
620633

621634
offset0 = self.fileSize
622635
self.MPI_FILE_OPEN(mode="a")
636+
nWrites = 0
637+
nCollectiveIO = self.nCollectiveIO
638+
623639
if self.MPI_ROOT:
624640
self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
625641
offset0 += self.tSize
626642

627643
for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
628644
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
629-
self.MPI_WRITE_AT(offset, field[iVar, *iBeg])
645+
self.MPI_WRITE_AT_ALL(offset, field[(iVar, *iBeg)])
646+
nWrites += 1
647+
648+
for _ in range(nCollectiveIO - nWrites):
649+
# Additional collective write to catch up with other processes
650+
self.MPI_WRITE_AT_ALL(offset0, field[:0])
651+
630652
self.MPI_FILE_CLOSE()
631653

632654
def iPos(self, iVar, iX):
@@ -669,9 +691,18 @@ def readField(self, idx):
669691
field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype)
670692

671693
self.MPI_FILE_OPEN(mode="r")
694+
nReads = 0
695+
nCollectiveIO = self.nCollectiveIO
696+
672697
for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
673698
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
674-
self.MPI_READ_AT(offset, field[iVar, *iBeg])
699+
self.MPI_READ_AT_ALL(offset, field[(iVar, *iBeg)])
700+
nReads += 1
701+
702+
for _ in range(nCollectiveIO - nReads):
703+
# Additional collective read to catch up with other processes
704+
self.MPI_READ_AT_ALL(offset0, field[:0])
705+
675706
self.MPI_FILE_CLOSE()
676707

677708
return t, field
@@ -684,7 +715,7 @@ def initGrid(nVar, gridSizes):
684715
dim = len(gridSizes)
685716
coords = [np.linspace(0, 1, num=n, endpoint=False) for n in gridSizes]
686717
s = [None] * dim
687-
u0 = np.array(np.arange(nVar) + 1)[:, *s]
718+
u0 = np.array(np.arange(nVar) + 1)[(slice(None), *s)]
688719
for x in np.meshgrid(*coords, indexing="ij"):
689720
u0 = u0 * x
690721
return coords, u0
@@ -706,8 +737,7 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes):
706737
iLoc, nLoc = blocks.localBounds
707738
Rectilinear.setupMPI(comm, iLoc, nLoc)
708739
s = [slice(i, i + n) for i, n in zip(iLoc, nLoc)]
709-
u0 = u0[:, *s]
710-
print(MPI_RANK, u0.shape)
740+
u0 = u0[(slice(None), *s)]
711741

712742
f1 = Rectilinear(DTYPES[dtypeIdx], fileName)
713743
f1.setHeader(nVar=nVar, coords=coords)
@@ -726,6 +756,11 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes):
726756
def compareFields_MPI(fileName, u0, nSteps):
727757
from pySDC.helpers.fieldsIO import FieldsIO
728758

759+
comm = MPI.COMM_WORLD
760+
MPI_RANK = comm.Get_rank()
761+
if MPI_RANK == 0:
762+
print("Comparing fields with MPI")
763+
729764
f2 = FieldsIO.fromFile(fileName)
730765

731766
times = np.arange(nSteps) / nSteps

pySDC/tests/test_helpers/test_fieldsIO.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
import glob
44
import pytest
55

6-
if sys.version_info < (3, 11):
7-
pytest.skip("skipping fieldsIO tests on python lower than 3.11", allow_module_level=True)
8-
96
import itertools
107
import numpy as np
118

@@ -14,6 +11,7 @@
1411
FieldsIO.ALLOW_OVERWRITE = True
1512

1613

14+
@pytest.mark.base
1715
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
1816
@pytest.mark.parametrize("dim", range(4))
1917
def testHeader(dim, dtypeIdx):
@@ -65,6 +63,7 @@ def testHeader(dim, dtypeIdx):
6563
assert np.allclose(val, f2.header[key]), f"header's discrepancy for {key} in written {f2}"
6664

6765

66+
@pytest.mark.base
6867
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
6968
@pytest.mark.parametrize("nSteps", [1, 2, 10, 100])
7069
@pytest.mark.parametrize("nVar", [1, 2, 5])
@@ -106,6 +105,7 @@ def testScalar(nVar, nSteps, dtypeIdx):
106105
assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values"
107106

108107

108+
@pytest.mark.base
109109
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
110110
@pytest.mark.parametrize("nSteps", [1, 2, 5, 10])
111111
@pytest.mark.parametrize("nVar", [1, 2, 5])
@@ -155,6 +155,7 @@ def testRectilinear(dim, nVar, nSteps, dtypeIdx):
155155
assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values"
156156

157157

158+
@pytest.mark.base
158159
@pytest.mark.parametrize("nSteps", [1, 10])
159160
@pytest.mark.parametrize("nZ", [1, 5, 16])
160161
@pytest.mark.parametrize("nY", [1, 5, 16])
@@ -249,8 +250,7 @@ def testRectilinear_MPI(dim, nProcs, dtypeIdx, algo, nSteps, nVar):
249250
parser.add_argument('--gridSizes', type=int, nargs='+', help="number of grid points in each dimensions")
250251
args = parser.parse_args()
251252

252-
if sys.version_info >= (3, 11):
253-
from pySDC.helpers.fieldsIO import writeFields_MPI, compareFields_MPI
253+
from pySDC.helpers.fieldsIO import writeFields_MPI, compareFields_MPI
254254

255-
u0 = writeFields_MPI(**args.__dict__)
256-
compareFields_MPI(args.fileName, u0, args.nSteps)
255+
u0 = writeFields_MPI(**args.__dict__)
256+
compareFields_MPI(args.fileName, u0, args.nSteps)

0 commit comments

Comments
 (0)