9
9
#include " main_core/calculation_info.hpp"
10
10
#include " main_core/update.hpp"
11
11
12
+ #include < mutex>
12
13
#include < thread>
13
14
14
15
namespace power_grid_model {
@@ -60,27 +61,35 @@ template <class MainModel, class... ComponentType> class JobDispatch {
60
61
61
62
// error messages
62
63
std::vector<std::string> exceptions (n_scenarios, " " );
63
- std::vector<CalculationInfo> infos (n_scenarios);
64
+
65
+ // thread-safe handling of calculation info
66
+ std::mutex calculation_info_mutex;
67
+ auto const thread_safe_add_calculation_info = [&calculation_info,
68
+ &calculation_info_mutex](CalculationInfo const & info) {
69
+ std::lock_guard const lock{calculation_info_mutex};
70
+ main_core::merge_into (calculation_info, info);
71
+ };
64
72
65
73
// lambda for sub batch calculation
66
74
main_core::utils::SequenceIdx<ComponentType...> all_scenarios_sequence;
67
- auto sub_batch = sub_batch_calculation_ (model, std::forward<Calculate>(calculation_fn), result_data,
68
- update_data, all_scenarios_sequence, exceptions, infos);
75
+ auto sub_batch =
76
+ sub_batch_calculation_ (model, std::forward<Calculate>(calculation_fn), result_data, update_data,
77
+ all_scenarios_sequence, exceptions, thread_safe_add_calculation_info);
69
78
70
79
job_dispatch (sub_batch, n_scenarios, threading);
71
80
72
81
handle_batch_exceptions (exceptions);
73
- calculation_info = main_core::merge_calculation_info (infos);
74
82
75
83
return BatchParameter{};
76
84
}
77
85
78
- template <typename Calculate>
86
+ template <typename Calculate, typename AddCalculationInfo >
79
87
requires std::invocable<std::remove_cvref_t <Calculate>, MainModel&, MutableDataset const &, Idx>
80
88
static auto sub_batch_calculation_ (MainModel const & base_model, Calculate&& calculation_fn,
81
89
MutableDataset const & result_data, ConstDataset const & update_data,
82
90
main_core::utils::SequenceIdx<ComponentType...>& all_scenarios_sequence,
83
- std::vector<std::string>& exceptions, std::vector<CalculationInfo>& infos) {
91
+ std::vector<std::string>& exceptions,
92
+ AddCalculationInfo&& thread_safe_add_calculation_info) {
84
93
// cache component update order where possible.
85
94
// the order for a cacheable (independent) component by definition is the same across all scenarios
86
95
auto const components_to_update = base_model.get_components_to_update (update_data);
@@ -89,56 +98,55 @@ template <class MainModel, class... ComponentType> class JobDispatch {
89
98
all_scenarios_sequence = main_core::update::get_all_sequence_idx_map<ComponentType...>(
90
99
base_model.state (), update_data, 0 , components_to_update, update_independence, false );
91
100
92
- return [&base_model, &exceptions, &infos, calculation_fn_ = std::forward<Calculate>(calculation_fn),
93
- &result_data, &update_data, &all_scenarios_sequence_ = std::as_const (all_scenarios_sequence),
94
- components_to_update, update_independence](Idx start, Idx stride, Idx n_scenarios) {
101
+ return [&base_model, &exceptions, &thread_safe_add_calculation_info,
102
+ calculation_fn_ = std::forward<Calculate>(calculation_fn), &result_data, &update_data,
103
+ &all_scenarios_sequence_ = std::as_const (all_scenarios_sequence), components_to_update,
104
+ update_independence](Idx start, Idx stride, Idx n_scenarios) {
95
105
assert (n_scenarios <= narrow_cast<Idx>(exceptions.size ()));
96
- assert (n_scenarios <= narrow_cast<Idx>(infos.size ()));
97
106
98
- Timer const t_total (infos[start], 0000 , " Total in thread " ) ;
107
+ CalculationInfo thread_info ;
99
108
100
- auto const copy_model_functor = [&base_model, &infos](Idx scenario_idx) {
101
- Timer const t_copy_model_functor (infos[scenario_idx], 1100 , " Copy model" );
109
+ Timer t_total (thread_info, 0000 , " Total in thread" );
110
+
111
+ auto const copy_model_functor = [&base_model, &thread_info] {
112
+ Timer const t_copy_model_functor (thread_info, 1100 , " Copy model" );
102
113
return MainModel{base_model};
103
114
};
104
- auto model = copy_model_functor (start );
115
+ auto model = copy_model_functor ();
105
116
106
117
auto current_scenario_sequence_cache = main_core::utils::SequenceIdx<ComponentType...>{};
107
118
auto [setup, winddown] =
108
119
scenario_update_restore (model, update_data, components_to_update, update_independence,
109
- all_scenarios_sequence_, current_scenario_sequence_cache, infos );
120
+ all_scenarios_sequence_, current_scenario_sequence_cache, thread_info );
110
121
111
122
auto calculate_scenario = JobDispatch::call_with<Idx>(
112
- [&model, &calculation_fn_, &result_data, &infos ](Idx scenario_idx) {
123
+ [&model, &calculation_fn_, &result_data, &thread_info ](Idx scenario_idx) {
113
124
calculation_fn_ (model, result_data, scenario_idx);
114
- infos[scenario_idx]. merge ( model.calculation_info ());
125
+ main_core::merge_into (thread_info, model.calculation_info ());
115
126
},
116
- std::move (setup), std::move (winddown), scenario_exception_handler (model, exceptions, infos ),
117
- [&model, ©_model_functor](Idx scenario_idx) { model = copy_model_functor (scenario_idx ); });
127
+ std::move (setup), std::move (winddown), scenario_exception_handler (model, exceptions, thread_info ),
128
+ [&model, ©_model_functor](Idx /* scenario_idx*/ ) { model = copy_model_functor (); });
118
129
119
130
for (Idx scenario_idx = start; scenario_idx < n_scenarios; scenario_idx += stride) {
120
- Timer const t_total_single (infos[scenario_idx], 0100 , " Total single calculation in thread" );
121
-
131
+ Timer const t_total_single (thread_info, 0100 , " Total single calculation in thread" );
122
132
calculate_scenario (scenario_idx);
123
133
}
134
+
135
+ t_total.stop ();
136
+ thread_safe_add_calculation_info (thread_info);
124
137
};
125
138
}
126
139
127
- // run sequential if
128
- // specified threading < 0
129
- // use hardware threads, but it is either unknown (0) or only has one thread (1)
130
- // specified threading = 1
131
140
template <typename RunSubBatchFn>
132
141
requires std::invocable<std::remove_cvref_t <RunSubBatchFn>, Idx /* start*/ , Idx /* stride*/ , Idx /* n_scenarios*/ >
133
142
static void job_dispatch (RunSubBatchFn sub_batch, Idx n_scenarios, Idx threading) {
134
143
// run batches sequential or parallel
135
- auto const hardware_thread = static_cast <Idx>( std::thread::hardware_concurrency () );
136
- if (threading < 0 || threading == 1 || (threading == 0 && hardware_thread < 2 ) ) {
144
+ auto const n_thread = n_threads (n_scenarios, threading );
145
+ if (n_thread == 1 ) {
137
146
// run all in sequential
138
147
sub_batch (0 , 1 , n_scenarios);
139
148
} else {
140
149
// create parallel threads
141
- Idx const n_thread = std::min (threading == 0 ? hardware_thread : threading, n_scenarios);
142
150
std::vector<std::thread> threads;
143
151
threads.reserve (n_thread);
144
152
for (Idx thread_number = 0 ; thread_number < n_thread; ++thread_number) {
@@ -151,6 +159,18 @@ template <class MainModel, class... ComponentType> class JobDispatch {
151
159
}
152
160
}
153
161
162
+ // run sequential if
163
+ // specified threading < 0
164
+ // use hardware threads, but it is either unknown (0) or only has one thread (1)
165
+ // specified threading = 1
166
+ static Idx n_threads (Idx n_scenarios, Idx threading) {
167
+ auto const hardware_thread = static_cast <Idx>(std::thread::hardware_concurrency ());
168
+ if (threading < 0 || threading == 1 || (threading == 0 && hardware_thread < 2 )) {
169
+ return 1 ; // sequential
170
+ }
171
+ return std::min (threading == 0 ? hardware_thread : threading, n_scenarios);
172
+ }
173
+
154
174
template <typename ... Args, typename RunFn, typename SetupFn, typename WinddownFn, typename HandleExceptionFn,
155
175
typename RecoverFromBadFn>
156
176
requires std::invocable<std::remove_cvref_t <RunFn>, Args const &...> &&
@@ -184,7 +204,7 @@ template <class MainModel, class... ComponentType> class JobDispatch {
184
204
main_core::update::independence::UpdateIndependence<ComponentType...> const & do_update_cache,
185
205
main_core::utils::SequenceIdx<ComponentType...> const & all_scenario_sequence,
186
206
main_core::utils::SequenceIdx<ComponentType...>& current_scenario_sequence_cache,
187
- std::vector< CalculationInfo>& infos ) noexcept {
207
+ CalculationInfo& info ) noexcept {
188
208
main_core::utils::ComponentFlags<ComponentType...> independence_flags{};
189
209
std::ranges::transform (do_update_cache, independence_flags.begin (),
190
210
[](auto const & comp) { return comp.is_independent (); });
@@ -202,15 +222,15 @@ template <class MainModel, class... ComponentType> class JobDispatch {
202
222
203
223
return std::make_pair (
204
224
[&model, &update_data, scenario_sequence, ¤t_scenario_sequence_cache, &components_to_store,
205
- do_update_cache_ = std::move (do_update_cache), &infos ](Idx scenario_idx) {
206
- Timer const t_update_model (infos[scenario_idx] , 1200 , " Update model" );
225
+ do_update_cache_ = std::move (do_update_cache), &info ](Idx scenario_idx) {
226
+ Timer const t_update_model (info , 1200 , " Update model" );
207
227
current_scenario_sequence_cache = main_core::update::get_all_sequence_idx_map<ComponentType...>(
208
228
model.state (), update_data, scenario_idx, components_to_store, do_update_cache_, true );
209
229
210
230
model.template update_components <cached_update_t >(update_data, scenario_idx, scenario_sequence ());
211
231
},
212
- [&model, scenario_sequence, ¤t_scenario_sequence_cache, &infos ](Idx scenario_idx) {
213
- Timer const t_update_model (infos[scenario_idx] , 1201 , " Restore model" );
232
+ [&model, scenario_sequence, ¤t_scenario_sequence_cache, &info ](Idx /* scenario_idx*/ ) {
233
+ Timer const t_update_model (info , 1201 , " Restore model" );
214
234
215
235
model.restore_components (scenario_sequence ());
216
236
std::ranges::for_each (current_scenario_sequence_cache,
@@ -220,8 +240,8 @@ template <class MainModel, class... ComponentType> class JobDispatch {
220
240
221
241
// Lippincott pattern
222
242
static auto scenario_exception_handler (MainModel& model, std::vector<std::string>& messages,
223
- std::vector< CalculationInfo>& infos ) {
224
- return [&model, &messages, &infos ](Idx scenario_idx) {
243
+ CalculationInfo& info ) {
244
+ return [&model, &messages, &info ](Idx scenario_idx) {
225
245
std::exception_ptr const ex_ptr = std::current_exception ();
226
246
try {
227
247
std::rethrow_exception (ex_ptr);
@@ -230,7 +250,7 @@ template <class MainModel, class... ComponentType> class JobDispatch {
230
250
} catch (...) {
231
251
messages[scenario_idx] = " unknown exception" ;
232
252
}
233
- infos[scenario_idx] .merge (model.calculation_info ());
253
+ info .merge (model.calculation_info ());
234
254
};
235
255
}
236
256
0 commit comments