Skip to content

ENH: Improved threadsafe messaging #1340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ find_package(span-lite CONFIG REQUIRED)
find_package(Eigen3 REQUIRED)
find_package(boost_mp11 CONFIG REQUIRED)
find_package(nod CONFIG REQUIRED)
find_package(spdlog CONFIG REQUIRED)

# -----------------------------------------------------------------------
# Find HDF5 and get the path to the DLL libraries and put that into a
Expand Down Expand Up @@ -265,6 +266,7 @@ target_link_libraries(simplnx
HDF5::HDF5
Boost::mp11
nod::nod
spdlog::spdlog
)

if(UNIX)
Expand Down Expand Up @@ -542,6 +544,7 @@ set(SIMPLNX_HDRS
${SIMPLNX_SOURCE_DIR}/Utilities/HistogramUtilities.hpp
${SIMPLNX_SOURCE_DIR}/Utilities/MaskCompareUtilities.hpp
${SIMPLNX_SOURCE_DIR}/Utilities/MemoryUtilities.hpp
${SIMPLNX_SOURCE_DIR}/Utilities/MessageHelper.hpp
${SIMPLNX_SOURCE_DIR}/Utilities/StringUtilities.hpp
${SIMPLNX_SOURCE_DIR}/Utilities/StringInterpretationUtilities.hpp
${SIMPLNX_SOURCE_DIR}/Utilities/IntersectionUtilities.hpp
Expand Down Expand Up @@ -745,6 +748,7 @@ set(SIMPLNX_SRCS
${SIMPLNX_SOURCE_DIR}/Utilities/DataStoreUtilities.cpp
${SIMPLNX_SOURCE_DIR}/Utilities/MaskCompareUtilities.cpp
${SIMPLNX_SOURCE_DIR}/Utilities/MemoryUtilities.cpp
${SIMPLNX_SOURCE_DIR}/Utilities/MessageHelper.cpp
${SIMPLNX_SOURCE_DIR}/Utilities/IParallelAlgorithm.cpp
${SIMPLNX_SOURCE_DIR}/Utilities/ParallelDataAlgorithm.cpp
${SIMPLNX_SOURCE_DIR}/Utilities/ParallelData2DAlgorithm.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,14 @@ Result<> AlignSectionsMisorientation::findShifts(std::vector<int64_t>& xShifts,

std::vector<LaueOps::Pointer> orientationOps = LaueOps::GetAllOrientationOps();

int32_t progInt = 0;

// Allocate a 2D Array which will be reused from slice to slice
std::vector<bool> misorients(dims[0] * dims[1], false);

const auto halfDim0 = static_cast<int64_t>(dims[0] * 0.5f);
const auto halfDim1 = static_cast<int64_t>(dims[1] * 0.5f);

double deg2Rad = (nx::core::numbers::pi / 180.0);
auto start = std::chrono::steady_clock::now();
ThrottledMessenger throttledMessenger = getMessageHelper().createThrottledMessenger();
if(m_InputValues->StoreAlignmentShifts)
{
auto& slicesStore = m_DataStructure.getDataAs<UInt32Array>(m_InputValues->SlicesArrayPath)->getDataStoreRef();
Expand All @@ -94,15 +92,7 @@ Result<> AlignSectionsMisorientation::findShifts(std::vector<int64_t>& xShifts,
{
return {};
}
progInt = static_cast<float>(iter) / static_cast<float>(dims[2]) * 100.0f;
auto now = std::chrono::steady_clock::now();
// Only send updates every 1 second
if(std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count() > 1000)
{
std::string message = fmt::format("Determining Shifts || {}% Complete", progInt);
m_MessageHandler(nx::core::IFilter::ProgressMessage{nx::core::IFilter::Message::Type::Info, message, progInt});
start = std::chrono::steady_clock::now();
}
throttledMessenger.sendThrottledMessage([&]() { return fmt::format("Determining Shifts || {:.2f}% Complete", CalculatePercentComplete(iter, dims[2])); });
if(getCancel())
{
return {};
Expand Down Expand Up @@ -210,15 +200,7 @@ Result<> AlignSectionsMisorientation::findShifts(std::vector<int64_t>& xShifts,
// Loop over the Z Direction
for(int64_t iter = 1; iter < dims[2]; iter++)
{
progInt = static_cast<float>(iter) / static_cast<float>(dims[2]) * 100.0f;
auto now = std::chrono::steady_clock::now();
// Only send updates every 1 second
if(std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count() > 1000)
{
std::string message = fmt::format("Determining Shifts || {}% Complete", progInt);
m_MessageHandler(nx::core::IFilter::ProgressMessage{nx::core::IFilter::Message::Type::Info, message, progInt});
start = std::chrono::steady_clock::now();
}
throttledMessenger.sendThrottledMessage([&]() { return fmt::format("Determining Shifts || {:.2f}% Complete", CalculatePercentComplete(iter, dims[2])); });
if(getCancel())
{
return {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "simplnx/DataStructure/DataGroup.hpp"
#include "simplnx/DataStructure/Geometry/ImageGeom.hpp"
#include "simplnx/Utilities/MaskCompareUtilities.hpp"
#include "simplnx/Utilities/MessageHelper.hpp"

#include "EbsdLib/Core/Orientation.hpp"
#include "EbsdLib/LaueOps/LaueOps.h"
Expand Down Expand Up @@ -80,18 +81,11 @@ Result<> BadDataNeighborOrientationCheck::operator()()

std::vector<int32_t> neighborCount(totalPoints, 0);

int64_t progressInt = 0;
auto start = std::chrono::steady_clock::now();
MessageHelper messageHelper(m_MessageHandler);
ThrottledMessenger throttledMessenger = messageHelper.createThrottledMessenger();
for(size_t i = 0; i < totalPoints; i++)
{
auto now = std::chrono::steady_clock::now();
if(std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count() > 1000)
{
progressInt = static_cast<int64_t>((static_cast<float>(i) / totalPoints) * 100.0f);
std::string ss = fmt::format("Processing Data '{}'% completed", progressInt);
m_MessageHandler({IFilter::Message::Type::Info, ss});
start = std::chrono::steady_clock::now();
}
throttledMessenger.sendThrottledMessage([&]() { return fmt::format("Processing Data {:.2f}% completed", CalculatePercentComplete(i, totalPoints)); });

if(!maskCompare->isTrue(i))
{
Expand Down Expand Up @@ -157,19 +151,12 @@ Result<> BadDataNeighborOrientationCheck::operator()()
while(counter > 0)
{
counter = 0;
progressInt = 0;
start = std::chrono::steady_clock::now();
for(size_t i = 0; i < totalPoints; i++)
{
auto now = std::chrono::steady_clock::now();
if(std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count() > 1000)
{
progressInt = static_cast<int64_t>((static_cast<float>(i) / totalPoints) * 100.0f);
std::string ss =
fmt::format("Level '{}' of '{}' || Processing Data ('{}') '{}'% completed", (startLevel - currentLevel) + 1, startLevel - m_InputValues->NumberOfNeighbors, loopNumber, progressInt);
m_MessageHandler({IFilter::Message::Type::Info, ss});
start = std::chrono::steady_clock::now();
}
throttledMessenger.sendThrottledMessage([&]() {
return fmt::format("Level '{}' of '{}' || Processing Data ('{}') {:.2f}% completed", (startLevel - currentLevel) + 1, startLevel - m_InputValues->NumberOfNeighbors, loopNumber,
CalculatePercentComplete(i, totalPoints));
});

if(neighborCount[i] >= currentLevel && !maskCompare->isTrue(i))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "simplnx/DataStructure/DataArray.hpp"
#include "simplnx/DataStructure/DataGroup.hpp"
#include "simplnx/Utilities/Math/MatrixMath.hpp"
#include "simplnx/Utilities/MessageHelper.hpp"
#include "simplnx/Utilities/ParallelDataAlgorithm.hpp"
#include "simplnx/Utilities/TimeUtilities.hpp"

Expand Down Expand Up @@ -389,11 +390,13 @@ Result<> ComputeGBCD::operator()()
SizeGBCD sizeGbcd(triangleChunkSize, k_NumMisoReps, m_InputValues->GBCDRes);
int32 totalGBCDBins = sizeGbcd.m_GbcdSizes[0] * sizeGbcd.m_GbcdSizes[1] * sizeGbcd.m_GbcdSizes[2] * sizeGbcd.m_GbcdSizes[3] * sizeGbcd.m_GbcdSizes[4] * 2;

MessageHelper messageHelper(m_MessageHandler);

// create an array to hold the total face area for each phase and initialize the array to 0.0
std::vector<double> totalFaceArea(totalPhases, 0.0);
std::string ss = fmt::format("1/2 Starting GBCD Calculation and Summation Phase");
m_MessageHandler({IFilter::Message::Type::Info, ss});
auto startMillis = std::chrono::steady_clock::now();
auto startTime = std::chrono::steady_clock::now();
messageHelper.sendMessage("1/2 Starting GBCD Calculation and Summation Phase");
ThrottledMessenger throttledMessenger = messageHelper.createThrottledMessenger();

for(usize i = 0; i < totalFaces; i = i + triangleChunkSize)
{
Expand Down Expand Up @@ -448,19 +451,17 @@ Result<> ComputeGBCD::operator()()
}
}

auto currentMillis = std::chrono::steady_clock::now();
if(std::chrono::duration_cast<std::chrono::milliseconds>(currentMillis - startMillis).count() > 1000)
{
throttledMessenger.sendThrottledMessage([&]() {
auto currentTime = throttledMessenger.getLastTime();
const usize k_LastTriangleIndex = i + triangleChunkSize;
float32 currentRate = static_cast<float32>(triangleChunkSize) / static_cast<float32>(std::chrono::duration_cast<std::chrono::milliseconds>(currentMillis - startMillis).count());
float32 currentRate = static_cast<float32>(triangleChunkSize) / static_cast<float32>(std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - startTime).count());
uint64 estimatedTime = static_cast<uint64>(totalFaces - k_LastTriangleIndex) / currentRate;
ss = fmt::format("Calculating GBCD || Triangles {}/{} Completed || Est. Time Remain: {}", k_LastTriangleIndex, totalFaces, ConvertMillisToHrsMinSecs(estimatedTime));
startMillis = std::chrono::steady_clock::now();
m_MessageHandler({IFilter::Message::Type::Info, ss});
}
startTime = currentTime;
return fmt::format("Calculating GBCD || Triangles {}/{} Completed || Est. Time Remain: {}", k_LastTriangleIndex, totalFaces, ConvertMillisToHrsMinSecs(estimatedTime));
});
}

m_MessageHandler({IFilter::Message::Type::Info, "2/2 Starting GBCD Normalization Phase"});
messageHelper.sendMessage("2/2 Starting GBCD Normalization Phase");

for(int32 i = 0; i < totalPhases; i++)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "simplnx/DataStructure/DataArray.hpp"
#include "simplnx/DataStructure/DataGroup.hpp"
#include "simplnx/DataStructure/Geometry/ImageGeom.hpp"
#include "simplnx/Utilities/MessageHelper.hpp"
#include "simplnx/Utilities/ParallelData3DAlgorithm.hpp"

#include "EbsdLib/LaueOps/LaueOps.h"
Expand All @@ -17,9 +18,9 @@ namespace
class FindKernelAvgMisorientationsImpl
{
public:
FindKernelAvgMisorientationsImpl(ComputeKernelAvgMisorientations* filter, DataStructure& dataStructure, const ComputeKernelAvgMisorientationsInputValues* inputValues,
FindKernelAvgMisorientationsImpl(ProgressMessageHelper& progressMessenger, DataStructure& dataStructure, const ComputeKernelAvgMisorientationsInputValues* inputValues,
const std::atomic_bool& shouldCancel)
: m_Filter(filter)
: m_ProgressMessageHelper(progressMessenger)
, m_DataStructure(dataStructure)
, m_InputValues(inputValues)
, m_ShouldCancel(shouldCancel)
Expand Down Expand Up @@ -54,7 +55,8 @@ class FindKernelAvgMisorientationsImpl
// messenger values
usize counter = 0;
usize increment = (zEnd - zStart) / 100;
auto start = std::chrono::steady_clock::now();

ProgressMessenger progressMessenger = m_ProgressMessageHelper.createProgressMessenger();

auto xPoints = static_cast<int64_t>(udims[0]);
auto yPoints = static_cast<int64_t>(udims[1]);
Expand All @@ -68,13 +70,8 @@ class FindKernelAvgMisorientationsImpl

if(counter > increment)
{
auto now = std::chrono::steady_clock::now();
if(std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count() > 1000)
{
m_Filter->sendThreadSafeProgressMessage(counter);
counter = 0;
start = std::chrono::steady_clock::now();
}
progressMessenger.sendProgressMessage(counter);
counter = 0;
}

for(size_t row = yStart; row < yEnd; row++)
Expand Down Expand Up @@ -145,7 +142,7 @@ class FindKernelAvgMisorientationsImpl
}
}
}
m_Filter->sendThreadSafeProgressMessage(counter);
progressMessenger.sendProgressMessage(counter);
}

void operator()(const Range3D& range) const
Expand All @@ -154,7 +151,7 @@ class FindKernelAvgMisorientationsImpl
}

private:
ComputeKernelAvgMisorientations* m_Filter = nullptr;
ProgressMessageHelper& m_ProgressMessageHelper;
DataStructure& m_DataStructure;
const ComputeKernelAvgMisorientationsInputValues* m_InputValues = nullptr;
const std::atomic_bool& m_ShouldCancel;
Expand All @@ -175,40 +172,17 @@ ComputeKernelAvgMisorientations::ComputeKernelAvgMisorientations(DataStructure&
// -----------------------------------------------------------------------------
ComputeKernelAvgMisorientations::~ComputeKernelAvgMisorientations() noexcept = default;

// -----------------------------------------------------------------------------
const std::atomic_bool& ComputeKernelAvgMisorientations::getCancel()
{
return m_ShouldCancel;
}

// -----------------------------------------------------------------------------
void ComputeKernelAvgMisorientations::sendThreadSafeProgressMessage(usize counter)
{
std::lock_guard<std::mutex> guard(m_ProgressMessage_Mutex);

m_ProgressCounter += counter;
auto now = std::chrono::steady_clock::now();
if(std::chrono::duration_cast<std::chrono::milliseconds>(now - m_InitialPoint).count() < 1000)
{
return;
}

auto progressInt = static_cast<usize>((static_cast<float32>(m_ProgressCounter) / static_cast<float32>(m_TotalElements)) * 100.0f);
std::string ss = fmt::format("Finding Kernel Average Misorientations || {}%", progressInt);
m_MessageHandler(IFilter::Message::Type::Info, ss);

m_LastProgressInt = progressInt;
m_InitialPoint = std::chrono::steady_clock::now();
}

// -----------------------------------------------------------------------------
Result<> ComputeKernelAvgMisorientations::operator()()
{
auto* gridGeom = m_DataStructure.getDataAs<ImageGeom>(m_InputValues->InputImageGeometry);
SizeVec3 udims = gridGeom->getDimensions();

// set up threadsafe messenger
m_TotalElements = udims[2] * udims[1] * udims[0];
MessageHelper messageHelper(m_MessageHandler);
ProgressMessageHelper progressMessageHelper = messageHelper.createProgressMessageHelper();

progressMessageHelper.setMaxProgresss(udims[2] * udims[1] * udims[0]);
progressMessageHelper.setProgressMessageTemplate("Finding Kernel Average Misorientations || {:.2f}%");

typename IParallelAlgorithm::AlgorithmArrays algArrays;
algArrays.push_back(m_DataStructure.getDataAs<IDataArray>(m_InputValues->CellPhasesArrayPath));
Expand All @@ -220,7 +194,7 @@ Result<> ComputeKernelAvgMisorientations::operator()()
ParallelData3DAlgorithm parallelAlgorithm;
parallelAlgorithm.setRange(Range3D(0, udims[0], 0, udims[1], 0, udims[2]));
parallelAlgorithm.requireArraysInMemory(algArrays);
parallelAlgorithm.execute(FindKernelAvgMisorientationsImpl(this, m_DataStructure, m_InputValues, m_ShouldCancel));
parallelAlgorithm.execute(FindKernelAvgMisorientationsImpl(progressMessageHelper, m_DataStructure, m_InputValues, m_ShouldCancel));

return {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,11 @@ class ORIENTATIONANALYSIS_EXPORT ComputeKernelAvgMisorientations

Result<> operator()();

const std::atomic_bool& getCancel();

void sendThreadSafeProgressMessage(usize counter);

private:
DataStructure& m_DataStructure;
const ComputeKernelAvgMisorientationsInputValues* m_InputValues = nullptr;
const std::atomic_bool& m_ShouldCancel;
const IFilter::MessageHandler& m_MessageHandler;

// Thread safe Progress Message
std::chrono::steady_clock::time_point m_InitialPoint = std::chrono::steady_clock::now();
mutable std::mutex m_ProgressMessage_Mutex;
size_t m_TotalElements = 0;
size_t m_ProgressCounter = 0;
size_t m_LastProgressInt = 0;
};

} // namespace nx::core
Loading
Loading