Skip to content

Commit b84d2a3

Browse files
authored
Merge pull request #1018 from PowerGridModel/feature/cleanup-main-model-batch-dispatch
Cleanup main model: Job dispatch into separate file
2 parents 48be6e3 + 31cee52 commit b84d2a3

File tree

4 files changed

+285
-233
lines changed

4 files changed

+285
-233
lines changed
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
// SPDX-FileCopyrightText: Contributors to the Power Grid Model project <[email protected]>
2+
//
3+
// SPDX-License-Identifier: MPL-2.0
4+
5+
#pragma once
6+
7+
#include "main_model_fwd.hpp"
8+
9+
#include "main_core/calculation_info.hpp"
10+
#include "main_core/update.hpp"
11+
12+
#include <thread>
13+
14+
namespace power_grid_model {
15+
16+
template <class MainModel, class... ComponentType> class JobDispatch {
17+
private:
18+
using SequenceIdxView = std::array<std::span<Idx2D const>, main_core::utils::n_types<ComponentType...>>;
19+
20+
public:
21+
static constexpr Idx ignore_output{-1};
22+
static constexpr Idx sequential{-1};
23+
24+
template <typename Calculate>
25+
requires std::invocable<std::remove_cvref_t<Calculate>, MainModel&, MutableDataset const&, Idx>
26+
static BatchParameter batch_calculation_(MainModel& model, CalculationInfo& calculation_info,
27+
Calculate&& calculation_fn, MutableDataset const& result_data,
28+
ConstDataset const& update_data, Idx threading = sequential) {
29+
// if the update dataset is empty without any component
30+
// execute one power flow in the current instance, no batch calculation is needed
31+
if (update_data.empty()) {
32+
std::forward<Calculate>(calculation_fn)(model, result_data, 0);
33+
return BatchParameter{};
34+
}
35+
36+
// get batch size
37+
Idx const n_scenarios = update_data.batch_size();
38+
39+
// if the batch_size is zero, it is a special case without doing any calculations at all
40+
// we consider in this case the batch set is independent but not topology cacheable
41+
if (n_scenarios == 0) {
42+
return BatchParameter{};
43+
}
44+
45+
// calculate once to cache topology, ignore results, all math solvers are initialized
46+
try {
47+
calculation_fn(model,
48+
{
49+
false,
50+
1,
51+
"sym_output",
52+
model.meta_data(),
53+
},
54+
ignore_output);
55+
} catch (SparseMatrixError const&) { // NOLINT(bugprone-empty-catch) // NOSONAR
56+
// missing entries are provided in the update data
57+
} catch (NotObservableError const&) { // NOLINT(bugprone-empty-catch) // NOSONAR
58+
// missing entries are provided in the update data
59+
}
60+
61+
// error messages
62+
std::vector<std::string> exceptions(n_scenarios, "");
63+
std::vector<CalculationInfo> infos(n_scenarios);
64+
65+
// lambda for sub batch calculation
66+
main_core::utils::SequenceIdx<ComponentType...> all_scenarios_sequence;
67+
auto sub_batch = sub_batch_calculation_(model, std::forward<Calculate>(calculation_fn), result_data,
68+
update_data, all_scenarios_sequence, exceptions, infos);
69+
70+
job_dispatch(sub_batch, n_scenarios, threading);
71+
72+
handle_batch_exceptions(exceptions);
73+
calculation_info = main_core::merge_calculation_info(infos);
74+
75+
return BatchParameter{};
76+
}
77+
78+
template <typename Calculate>
79+
requires std::invocable<std::remove_cvref_t<Calculate>, MainModel&, MutableDataset const&, Idx>
80+
static auto sub_batch_calculation_(MainModel const& base_model, Calculate&& calculation_fn,
81+
MutableDataset const& result_data, ConstDataset const& update_data,
82+
main_core::utils::SequenceIdx<ComponentType...>& all_scenarios_sequence,
83+
std::vector<std::string>& exceptions, std::vector<CalculationInfo>& infos) {
84+
// cache component update order where possible.
85+
// the order for a cacheable (independent) component by definition is the same across all scenarios
86+
auto const components_to_update = base_model.get_components_to_update(update_data);
87+
auto const update_independence = main_core::update::independence::check_update_independence<ComponentType...>(
88+
base_model.state(), update_data);
89+
all_scenarios_sequence = main_core::update::get_all_sequence_idx_map<ComponentType...>(
90+
base_model.state(), update_data, 0, components_to_update, update_independence, false);
91+
92+
return [&base_model, &exceptions, &infos, calculation_fn_ = std::forward<Calculate>(calculation_fn),
93+
&result_data, &update_data, &all_scenarios_sequence_ = std::as_const(all_scenarios_sequence),
94+
components_to_update, update_independence](Idx start, Idx stride, Idx n_scenarios) {
95+
assert(n_scenarios <= narrow_cast<Idx>(exceptions.size()));
96+
assert(n_scenarios <= narrow_cast<Idx>(infos.size()));
97+
98+
Timer const t_total(infos[start], 0000, "Total in thread");
99+
100+
auto const copy_model_functor = [&base_model, &infos](Idx scenario_idx) {
101+
Timer const t_copy_model_functor(infos[scenario_idx], 1100, "Copy model");
102+
return MainModel{base_model};
103+
};
104+
auto model = copy_model_functor(start);
105+
106+
auto current_scenario_sequence_cache = main_core::utils::SequenceIdx<ComponentType...>{};
107+
auto [setup, winddown] =
108+
scenario_update_restore(model, update_data, components_to_update, update_independence,
109+
all_scenarios_sequence_, current_scenario_sequence_cache, infos);
110+
111+
auto calculate_scenario = JobDispatch::call_with<Idx>(
112+
[&model, &calculation_fn_, &result_data, &infos](Idx scenario_idx) {
113+
calculation_fn_(model, result_data, scenario_idx);
114+
infos[scenario_idx].merge(model.calculation_info());
115+
},
116+
std::move(setup), std::move(winddown), scenario_exception_handler(model, exceptions, infos),
117+
[&model, &copy_model_functor](Idx scenario_idx) { model = copy_model_functor(scenario_idx); });
118+
119+
for (Idx scenario_idx = start; scenario_idx < n_scenarios; scenario_idx += stride) {
120+
Timer const t_total_single(infos[scenario_idx], 0100, "Total single calculation in thread");
121+
122+
calculate_scenario(scenario_idx);
123+
}
124+
};
125+
}
126+
127+
// run sequential if
128+
// specified threading < 0
129+
// use hardware threads, but it is either unknown (0) or only has one thread (1)
130+
// specified threading = 1
131+
template <typename RunSubBatchFn>
132+
requires std::invocable<std::remove_cvref_t<RunSubBatchFn>, Idx /*start*/, Idx /*stride*/, Idx /*n_scenarios*/>
133+
static void job_dispatch(RunSubBatchFn sub_batch, Idx n_scenarios, Idx threading) {
134+
// run batches sequential or parallel
135+
auto const hardware_thread = static_cast<Idx>(std::thread::hardware_concurrency());
136+
if (threading < 0 || threading == 1 || (threading == 0 && hardware_thread < 2)) {
137+
// run all in sequential
138+
sub_batch(0, 1, n_scenarios);
139+
} else {
140+
// create parallel threads
141+
Idx const n_thread = std::min(threading == 0 ? hardware_thread : threading, n_scenarios);
142+
std::vector<std::thread> threads;
143+
threads.reserve(n_thread);
144+
for (Idx thread_number = 0; thread_number < n_thread; ++thread_number) {
145+
// compute each sub batch with stride
146+
threads.emplace_back(sub_batch, thread_number, n_thread, n_scenarios);
147+
}
148+
for (auto& thread : threads) {
149+
thread.join();
150+
}
151+
}
152+
}
153+
154+
template <typename... Args, typename RunFn, typename SetupFn, typename WinddownFn, typename HandleExceptionFn,
155+
typename RecoverFromBadFn>
156+
requires std::invocable<std::remove_cvref_t<RunFn>, Args const&...> &&
157+
std::invocable<std::remove_cvref_t<SetupFn>, Args const&...> &&
158+
std::invocable<std::remove_cvref_t<WinddownFn>, Args const&...> &&
159+
std::invocable<std::remove_cvref_t<HandleExceptionFn>, Args const&...> &&
160+
std::invocable<std::remove_cvref_t<RecoverFromBadFn>, Args const&...>
161+
static auto call_with(RunFn run, SetupFn setup, WinddownFn winddown, HandleExceptionFn handle_exception,
162+
RecoverFromBadFn recover_from_bad) {
163+
return [setup_ = std::move(setup), run_ = std::move(run), winddown_ = std::move(winddown),
164+
handle_exception_ = std::move(handle_exception),
165+
recover_from_bad_ = std::move(recover_from_bad)](Args const&... args) {
166+
try {
167+
setup_(args...);
168+
run_(args...);
169+
winddown_(args...);
170+
} catch (...) {
171+
handle_exception_(args...);
172+
try {
173+
winddown_(args...);
174+
} catch (...) {
175+
recover_from_bad_(args...);
176+
}
177+
}
178+
};
179+
}
180+
181+
static auto scenario_update_restore(
182+
MainModel& model, ConstDataset const& update_data,
183+
main_core::utils::ComponentFlags<ComponentType...> const& components_to_store,
184+
main_core::update::independence::UpdateIndependence<ComponentType...> const& do_update_cache,
185+
main_core::utils::SequenceIdx<ComponentType...> const& all_scenario_sequence,
186+
main_core::utils::SequenceIdx<ComponentType...>& current_scenario_sequence_cache,
187+
std::vector<CalculationInfo>& infos) noexcept {
188+
main_core::utils::ComponentFlags<ComponentType...> independence_flags{};
189+
std::ranges::transform(do_update_cache, independence_flags.begin(),
190+
[](auto const& comp) { return comp.is_independent(); });
191+
auto const scenario_sequence = [&all_scenario_sequence, &current_scenario_sequence_cache,
192+
independence_flags_ = std::move(independence_flags)]() -> SequenceIdxView {
193+
return main_core::utils::run_functor_with_all_types_return_array<ComponentType...>(
194+
[&all_scenario_sequence, &current_scenario_sequence_cache, &independence_flags_]<typename CT>() {
195+
constexpr auto comp_idx = main_core::utils::index_of_component<CT, ComponentType...>;
196+
if (std::get<comp_idx>(independence_flags_)) {
197+
return std::span<Idx2D const>{std::get<comp_idx>(all_scenario_sequence)};
198+
}
199+
return std::span<Idx2D const>{std::get<comp_idx>(current_scenario_sequence_cache)};
200+
});
201+
};
202+
203+
return std::make_pair(
204+
[&model, &update_data, scenario_sequence, &current_scenario_sequence_cache, &components_to_store,
205+
do_update_cache_ = std::move(do_update_cache), &infos](Idx scenario_idx) {
206+
Timer const t_update_model(infos[scenario_idx], 1200, "Update model");
207+
current_scenario_sequence_cache = main_core::update::get_all_sequence_idx_map<ComponentType...>(
208+
model.state(), update_data, scenario_idx, components_to_store, do_update_cache_, true);
209+
210+
model.template update_components<cached_update_t>(update_data, scenario_idx, scenario_sequence());
211+
},
212+
[&model, scenario_sequence, &current_scenario_sequence_cache, &infos](Idx scenario_idx) {
213+
Timer const t_update_model(infos[scenario_idx], 1201, "Restore model");
214+
215+
model.restore_components(scenario_sequence());
216+
std::ranges::for_each(current_scenario_sequence_cache,
217+
[](auto& comp_seq_idx) { comp_seq_idx.clear(); });
218+
});
219+
}
220+
221+
// Lippincott pattern
222+
static auto scenario_exception_handler(MainModel& model, std::vector<std::string>& messages,
223+
std::vector<CalculationInfo>& infos) {
224+
return [&model, &messages, &infos](Idx scenario_idx) {
225+
std::exception_ptr const ex_ptr = std::current_exception();
226+
try {
227+
std::rethrow_exception(ex_ptr);
228+
} catch (std::exception const& ex) {
229+
messages[scenario_idx] = ex.what();
230+
} catch (...) {
231+
messages[scenario_idx] = "unknown exception";
232+
}
233+
infos[scenario_idx].merge(model.calculation_info());
234+
};
235+
}
236+
237+
static void handle_batch_exceptions(std::vector<std::string> const& exceptions) {
238+
std::string combined_error_message;
239+
IdxVector failed_scenarios;
240+
std::vector<std::string> err_msgs;
241+
for (Idx batch = 0; batch < static_cast<Idx>(exceptions.size()); ++batch) {
242+
// append exception if it is not empty
243+
if (!exceptions[batch].empty()) {
244+
combined_error_message =
245+
std::format("{}Error in batch #{}: {}\n", combined_error_message, batch, exceptions[batch]);
246+
failed_scenarios.push_back(batch);
247+
err_msgs.push_back(exceptions[batch]);
248+
}
249+
}
250+
if (!combined_error_message.empty()) {
251+
throw BatchCalculationError(combined_error_message, failed_scenarios, err_msgs);
252+
}
253+
}
254+
};
255+
256+
} // namespace power_grid_model

power_grid_model_c/power_grid_model/include/power_grid_model/main_core/calculation_info.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
#pragma once
66

7+
#include "../common/calculation_info.hpp"
78
#include "../common/common.hpp"
9+
#include "../common/timer.hpp"
810

911
namespace power_grid_model::main_core {
1012

power_grid_model_c/power_grid_model/include/power_grid_model/main_core/update.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "state.hpp"
99

1010
#include "../all_components.hpp"
11+
#include "../auxiliary/dataset.hpp"
1112
#include "../common/iterator_facade.hpp"
1213
#include "../container.hpp"
1314

0 commit comments

Comments
 (0)