Skip to content

Commit 4f5c87e

Browse files
authored
Merge pull request #34 from pellse/33-add-bridge-to-indirectly-retrieve-correlationid-for-each-rule
33 add bridge to indirectly retrieve correlationid for each rule
2 parents 7db9db6 + fe33b70 commit 4f5c87e

File tree

21 files changed

+620
-283
lines changed

21 files changed

+620
-283
lines changed

assembler-cache-caffeine/src/test/java/io/github/pellse/assembler/caching/caffeine/AssemblerCaffeineCacheTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ record CDCAdd(OrderItem item) implements CDC {
304304
record CDCDelete(OrderItem item) implements CDC {
305305
}
306306

307-
BillingInfo updatedBillingInfo2 = new BillingInfo(2L, 2L, "4540111111111111");
307+
BillingInfo updatedBillingInfo2 = new BillingInfo(2, 2L, "4540111111111111");
308308

309309
Flux<Updated<BillingInfo>> billingInfoEventFlux = Flux.just(
310310
updated(billingInfo1), updated(billingInfo2), updated(billingInfo3), updated(updatedBillingInfo2))

assembler-spring-cache/src/test/java/io/github/pellse/assembler/caching/spring/SpringCacheAssemblerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ record CDCAdd(OrderItem item) implements CDC {
318318
record CDCDelete(OrderItem item) implements CDC {
319319
}
320320

321-
BillingInfo updatedBillingInfo2 = new BillingInfo(2L, 2L, "4540111111111111");
321+
BillingInfo updatedBillingInfo2 = new BillingInfo(2, 2L, "4540111111111111");
322322

323323
Flux<CacheEvent.Updated<BillingInfo>> billingInfoEventFlux = Flux.just(
324324
updated(billingInfo1), updated(billingInfo2), updated(billingInfo3), updated(updatedBillingInfo2))

assembler/src/main/java/io/github/pellse/assembler/AssemblerBuilder.java

Lines changed: 86 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -36,24 +36,24 @@ static <R> WithCorrelationIdResolverBuilder<R> assemblerOf(@SuppressWarnings("un
3636
return AssemblerBuilder::withCorrelationIdResolver;
3737
}
3838

39-
static <T, ID, R> WithRulesBuilder<T, ID, R> withCorrelationIdResolver(Function<T, ID> correlationIdResolver) {
39+
static <T, K, R> WithRulesBuilder<T, K, R> withCorrelationIdResolver(Function<T, K> correlationIdResolver) {
4040

4141
return (rules, aggregationFunction) -> assemblerAdapter -> {
4242

4343
final var queryFunctions = rules.stream()
4444
.map(rule -> rule.apply(correlationIdResolver))
4545
.toList();
4646

47-
final Function<Iterable<T>, Stream<Publisher<? extends Map<ID, ?>>>> subQueryMapperBuilder = topLevelEntities -> queryFunctions.stream()
47+
final Function<Iterable<T>, Stream<Publisher<? extends Map<K, ?>>>> subQueryMapperBuilder = topLevelEntities -> queryFunctions.stream()
4848
.map(queryFunction -> queryFunction.apply(topLevelEntities));
4949

50-
final BiFunction<T, List<Map<ID, ?>>, R> joinMapperResultsFunction =
50+
final BiFunction<T, List<Map<K, ?>>, R> joinMapperResultsFunction =
5151
(topLevelEntity, listOfMapperResults) -> aggregationFunction.apply(topLevelEntity,
5252
listOfMapperResults.stream()
5353
.map(mapperResult -> mapperResult.get(correlationIdResolver.apply(topLevelEntity)))
5454
.toArray());
5555

56-
final BiFunction<Iterable<T>, List<Map<ID, ?>>, Stream<R>> aggregateStreamBuilder =
56+
final BiFunction<Iterable<T>, List<Map<K, ?>>, Stream<R>> aggregateStreamBuilder =
5757
(topLevelEntities, mapperResults) -> toStream(topLevelEntities)
5858
.filter(Objects::nonNull)
5959
.map(topLevelEntity -> joinMapperResultsFunction.apply(topLevelEntity, mapperResults));
@@ -65,88 +65,88 @@ static <T, ID, R> WithRulesBuilder<T, ID, R> withCorrelationIdResolver(Function<
6565
@FunctionalInterface
6666
interface WithCorrelationIdResolverBuilder<R> {
6767

68-
<T, ID> WithRulesBuilder<T, ID, R> withCorrelationIdResolver(Function<T, ID> correlationIdResolver);
68+
<T, K> WithRulesBuilder<T, K, R> withCorrelationIdResolver(Function<T, K> correlationIdResolver);
6969
}
7070

7171
@FunctionalInterface
72-
interface WithRulesBuilder<T, ID, R> {
72+
interface WithRulesBuilder<T, K, R> {
7373

7474
@SuppressWarnings("unchecked")
75-
default <E1> Builder<T, ID, R> withRules(
76-
Rule<T, ID, E1> rule,
75+
default <E1> Builder<T, K, R> withRules(
76+
Rule<T, K, E1> rule,
7777
BiFunction<T, E1, R> aggregationFunction) {
7878

7979
return withRules(List.of(rule), (t, s) -> aggregationFunction.apply(t, (E1) s[0]));
8080
}
8181

8282
@SuppressWarnings("unchecked")
83-
default <E1, E2> Builder<T, ID, R> withRules(
84-
Rule<T, ID, E1> rule1,
85-
Rule<T, ID, E2> rule2,
83+
default <E1, E2> Builder<T, K, R> withRules(
84+
Rule<T, K, E1> rule1,
85+
Rule<T, K, E2> rule2,
8686
Function3<T, E1, E2, R> aggregationFunction) {
8787

8888
return withRules(List.of(rule1, rule2), (t, s) -> aggregationFunction.apply(t, (E1) s[0], (E2) s[1]));
8989
}
9090

9191
@SuppressWarnings("unchecked")
92-
default <E1, E2, E3> Builder<T, ID, R> withRules(
93-
Rule<T, ID, E1> rule1,
94-
Rule<T, ID, E2> rule2,
95-
Rule<T, ID, E3> rule3,
92+
default <E1, E2, E3> Builder<T, K, R> withRules(
93+
Rule<T, K, E1> rule1,
94+
Rule<T, K, E2> rule2,
95+
Rule<T, K, E3> rule3,
9696
Function4<T, E1, E2, E3, R> aggregationFunction) {
9797

9898
return withRules(List.of(rule1, rule2, rule3),
9999
(t, s) -> aggregationFunction.apply(t, (E1) s[0], (E2) s[1], (E3) s[2]));
100100
}
101101

102102
@SuppressWarnings("unchecked")
103-
default <E1, E2, E3, E4> Builder<T, ID, R> withRules(
104-
Rule<T, ID, E1> rule1,
105-
Rule<T, ID, E2> rule2,
106-
Rule<T, ID, E3> rule3,
107-
Rule<T, ID, E4> rule4,
103+
default <E1, E2, E3, E4> Builder<T, K, R> withRules(
104+
Rule<T, K, E1> rule1,
105+
Rule<T, K, E2> rule2,
106+
Rule<T, K, E3> rule3,
107+
Rule<T, K, E4> rule4,
108108
Function5<T, E1, E2, E3, E4, R> aggregationFunction) {
109109

110110
return withRules(List.of(rule1, rule2, rule3, rule4),
111111
(t, s) -> aggregationFunction.apply(t, (E1) s[0], (E2) s[1], (E3) s[2], (E4) s[3]));
112112
}
113113

114114
@SuppressWarnings("unchecked")
115-
default <E1, E2, E3, E4, E5> Builder<T, ID, R> withRules(
116-
Rule<T, ID, E1> rule1,
117-
Rule<T, ID, E2> rule2,
118-
Rule<T, ID, E3> rule3,
119-
Rule<T, ID, E4> rule4,
120-
Rule<T, ID, E5> rule5,
115+
default <E1, E2, E3, E4, E5> Builder<T, K, R> withRules(
116+
Rule<T, K, E1> rule1,
117+
Rule<T, K, E2> rule2,
118+
Rule<T, K, E3> rule3,
119+
Rule<T, K, E4> rule4,
120+
Rule<T, K, E5> rule5,
121121
Function6<T, E1, E2, E3, E4, E5, R> aggregationFunction) {
122122

123123
return withRules(List.of(rule1, rule2, rule3, rule4, rule5),
124124
(t, s) -> aggregationFunction.apply(t, (E1) s[0], (E2) s[1], (E3) s[2], (E4) s[3], (E5) s[4]));
125125
}
126126

127127
@SuppressWarnings("unchecked")
128-
default <E1, E2, E3, E4, E5, E6> Builder<T, ID, R> withRules(
129-
Rule<T, ID, E1> rule1,
130-
Rule<T, ID, E2> rule2,
131-
Rule<T, ID, E3> rule3,
132-
Rule<T, ID, E4> rule4,
133-
Rule<T, ID, E5> rule5,
134-
Rule<T, ID, E6> rule6,
128+
default <E1, E2, E3, E4, E5, E6> Builder<T, K, R> withRules(
129+
Rule<T, K, E1> rule1,
130+
Rule<T, K, E2> rule2,
131+
Rule<T, K, E3> rule3,
132+
Rule<T, K, E4> rule4,
133+
Rule<T, K, E5> rule5,
134+
Rule<T, K, E6> rule6,
135135
Function7<T, E1, E2, E3, E4, E5, E6, R> aggregationFunction) {
136136

137137
return withRules(List.of(rule1, rule2, rule3, rule4, rule5, rule6),
138138
(t, s) -> aggregationFunction.apply(t, (E1) s[0], (E2) s[1], (E3) s[2], (E4) s[3], (E5) s[4], (E6) s[5]));
139139
}
140140

141141
@SuppressWarnings("unchecked")
142-
default <E1, E2, E3, E4, E5, E6, E7> Builder<T, ID, R> withRules(
143-
Rule<T, ID, E1> rule1,
144-
Rule<T, ID, E2> rule2,
145-
Rule<T, ID, E3> rule3,
146-
Rule<T, ID, E4> rule4,
147-
Rule<T, ID, E5> rule5,
148-
Rule<T, ID, E6> rule6,
149-
Rule<T, ID, E7> rule7,
142+
default <E1, E2, E3, E4, E5, E6, E7> Builder<T, K, R> withRules(
143+
Rule<T, K, E1> rule1,
144+
Rule<T, K, E2> rule2,
145+
Rule<T, K, E3> rule3,
146+
Rule<T, K, E4> rule4,
147+
Rule<T, K, E5> rule5,
148+
Rule<T, K, E6> rule6,
149+
Rule<T, K, E7> rule7,
150150
Function8<T, E1, E2, E3, E4, E5, E6, E7, R> aggregationFunction) {
151151

152152
return withRules(List.of(rule1, rule2, rule3, rule4, rule5, rule6, rule7),
@@ -155,15 +155,15 @@ default <E1, E2, E3, E4, E5, E6, E7> Builder<T, ID, R> withRules(
155155
}
156156

157157
@SuppressWarnings("unchecked")
158-
default <E1, E2, E3, E4, E5, E6, E7, E8> Builder<T, ID, R> withRules(
159-
Rule<T, ID, E1> rule1,
160-
Rule<T, ID, E2> rule2,
161-
Rule<T, ID, E3> rule3,
162-
Rule<T, ID, E4> rule4,
163-
Rule<T, ID, E5> rule5,
164-
Rule<T, ID, E6> rule6,
165-
Rule<T, ID, E7> rule7,
166-
Rule<T, ID, E8> rule8,
158+
default <E1, E2, E3, E4, E5, E6, E7, E8> Builder<T, K, R> withRules(
159+
Rule<T, K, E1> rule1,
160+
Rule<T, K, E2> rule2,
161+
Rule<T, K, E3> rule3,
162+
Rule<T, K, E4> rule4,
163+
Rule<T, K, E5> rule5,
164+
Rule<T, K, E6> rule6,
165+
Rule<T, K, E7> rule7,
166+
Rule<T, K, E8> rule8,
167167
Function9<T, E1, E2, E3, E4, E5, E6, E7, E8, R> aggregationFunction) {
168168

169169
return withRules(List.of(rule1, rule2, rule3, rule4, rule5, rule6, rule7, rule8),
@@ -172,16 +172,16 @@ default <E1, E2, E3, E4, E5, E6, E7, E8> Builder<T, ID, R> withRules(
172172
}
173173

174174
@SuppressWarnings("unchecked")
175-
default <E1, E2, E3, E4, E5, E6, E7, E8, E9> Builder<T, ID, R> withRules(
176-
Rule<T, ID, E1> rule1,
177-
Rule<T, ID, E2> rule2,
178-
Rule<T, ID, E3> rule3,
179-
Rule<T, ID, E4> rule4,
180-
Rule<T, ID, E5> rule5,
181-
Rule<T, ID, E6> rule6,
182-
Rule<T, ID, E7> rule7,
183-
Rule<T, ID, E8> rule8,
184-
Rule<T, ID, E9> rule9,
175+
default <E1, E2, E3, E4, E5, E6, E7, E8, E9> Builder<T, K, R> withRules(
176+
Rule<T, K, E1> rule1,
177+
Rule<T, K, E2> rule2,
178+
Rule<T, K, E3> rule3,
179+
Rule<T, K, E4> rule4,
180+
Rule<T, K, E5> rule5,
181+
Rule<T, K, E6> rule6,
182+
Rule<T, K, E7> rule7,
183+
Rule<T, K, E8> rule8,
184+
Rule<T, K, E9> rule9,
185185
Function10<T, E1, E2, E3, E4, E5, E6, E7, E8, E9, R> aggregationFunction) {
186186

187187
return withRules(List.of(rule1, rule2, rule3, rule4, rule5, rule6, rule7, rule8, rule9),
@@ -190,17 +190,17 @@ default <E1, E2, E3, E4, E5, E6, E7, E8, E9> Builder<T, ID, R> withRules(
190190
}
191191

192192
@SuppressWarnings("unchecked")
193-
default <E1, E2, E3, E4, E5, E6, E7, E8, E9, E10> Builder<T, ID, R> withRules(
194-
Rule<T, ID, E1> rule1,
195-
Rule<T, ID, E2> rule2,
196-
Rule<T, ID, E3> rule3,
197-
Rule<T, ID, E4> rule4,
198-
Rule<T, ID, E5> rule5,
199-
Rule<T, ID, E6> rule6,
200-
Rule<T, ID, E7> rule7,
201-
Rule<T, ID, E8> rule8,
202-
Rule<T, ID, E9> rule9,
203-
Rule<T, ID, E10> rule10,
193+
default <E1, E2, E3, E4, E5, E6, E7, E8, E9, E10> Builder<T, K, R> withRules(
194+
Rule<T, K, E1> rule1,
195+
Rule<T, K, E2> rule2,
196+
Rule<T, K, E3> rule3,
197+
Rule<T, K, E4> rule4,
198+
Rule<T, K, E5> rule5,
199+
Rule<T, K, E6> rule6,
200+
Rule<T, K, E7> rule7,
201+
Rule<T, K, E8> rule8,
202+
Rule<T, K, E9> rule9,
203+
Rule<T, K, E10> rule10,
204204
Function11<T, E1, E2, E3, E4, E5, E6, E7, E8, E9, E10, R> aggregationFunction) {
205205

206206
return withRules(List.of(rule1, rule2, rule3, rule4, rule5, rule6, rule7, rule8, rule9, rule10),
@@ -209,30 +209,30 @@ default <E1, E2, E3, E4, E5, E6, E7, E8, E9, E10> Builder<T, ID, R> withRules(
209209
}
210210

211211
@SuppressWarnings("unchecked")
212-
default <E1, E2, E3, E4, E5, E6, E7, E8, E9, E10, E11> Builder<T, ID, R> withRules(
213-
Rule<T, ID, E1> rule1,
214-
Rule<T, ID, E2> rule2,
215-
Rule<T, ID, E3> rule3,
216-
Rule<T, ID, E4> rule4,
217-
Rule<T, ID, E5> rule5,
218-
Rule<T, ID, E6> rule6,
219-
Rule<T, ID, E7> rule7,
220-
Rule<T, ID, E8> rule8,
221-
Rule<T, ID, E9> rule9,
222-
Rule<T, ID, E10> rule10,
223-
Rule<T, ID, E11> rule11,
212+
default <E1, E2, E3, E4, E5, E6, E7, E8, E9, E10, E11> Builder<T, K, R> withRules(
213+
Rule<T, K, E1> rule1,
214+
Rule<T, K, E2> rule2,
215+
Rule<T, K, E3> rule3,
216+
Rule<T, K, E4> rule4,
217+
Rule<T, K, E5> rule5,
218+
Rule<T, K, E6> rule6,
219+
Rule<T, K, E7> rule7,
220+
Rule<T, K, E8> rule8,
221+
Rule<T, K, E9> rule9,
222+
Rule<T, K, E10> rule10,
223+
Rule<T, K, E11> rule11,
224224
Function12<T, E1, E2, E3, E4, E5, E6, E7, E8, E9, E10, E11, R> aggregationFunction) {
225225

226226
return withRules(List.of(rule1, rule2, rule3, rule4, rule5, rule6, rule7, rule8, rule9, rule10, rule11),
227227
(t, s) -> aggregationFunction.apply(
228228
t, (E1) s[0], (E2) s[1], (E3) s[2], (E4) s[3], (E5) s[4], (E6) s[5], (E7) s[6], (E8) s[7], (E9) s[8], (E10) s[9], (E11) s[10]));
229229
}
230230

231-
Builder<T, ID, R> withRules(List<Rule<T, ID, ?>> rules, BiFunction<T, Object[], R> aggregationFunction);
231+
Builder<T, K, R> withRules(List<Rule<T, K, ?>> rules, BiFunction<T, Object[], R> aggregationFunction);
232232
}
233233

234234
@FunctionalInterface
235-
interface Builder<T, ID, R> {
235+
interface Builder<T, K, R> {
236236

237237
default Assembler<T, R> build() {
238238
return build(fluxAdapter());
@@ -242,6 +242,6 @@ default Assembler<T, R> build(Scheduler scheduler) {
242242
return build(fluxAdapter(scheduler));
243243
}
244244

245-
Assembler<T, R> build(AssemblerAdapter<T, ID, R> adapter);
245+
Assembler<T, R> build(AssemblerAdapter<T, K, R> adapter);
246246
}
247247
}

assembler/src/main/java/io/github/pellse/assembler/QueryUtils.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,35 +21,32 @@
2121
import reactor.core.publisher.Flux;
2222
import reactor.core.publisher.Mono;
2323

24-
import java.util.Collection;
25-
import java.util.HashMap;
26-
import java.util.Map;
27-
import java.util.Set;
24+
import java.util.*;
2825
import java.util.function.Function;
2926
import java.util.function.Supplier;
3027

3128
import static io.github.pellse.assembler.RuleMapperSource.nullToEmptySource;
3229
import static io.github.pellse.util.ObjectUtils.isSafeEqual;
33-
import static io.github.pellse.util.ObjectUtils.then;
34-
import static io.github.pellse.util.collection.CollectionUtils.transform;
35-
import static io.github.pellse.util.collection.CollectionUtils.translate;
30+
import static io.github.pellse.util.collection.CollectionUtils.*;
3631
import static java.util.Objects.*;
3732
import static java.util.function.Predicate.not;
3833
import static reactor.core.publisher.Flux.fromIterable;
3934

4035
public interface QueryUtils {
4136

42-
static <T, TC extends Collection<T>, ID, EID, R, RRC, CTX extends RuleMapperContext<T, TC, ID, EID, R, RRC>> Function<Iterable<T>, Mono<Map<ID, RRC>>> buildQueryFunction(
43-
RuleMapperSource<T, TC, ID, EID, R, RRC, CTX> ruleMapperSource,
44-
CTX ruleMapperContext) {
37+
static <T, TC extends Collection<T>, K, ID, EID, R, RRC, CTX extends RuleMapperContext<T, TC, K, ID, EID, R, RRC>> Function<Iterable<T>, Mono<Map<ID, RRC>>> buildQueryFunction(
38+
RuleMapperSource<T, TC, K, ID, EID, R, RRC, CTX> ruleMapperSource,
39+
CTX ctx) {
4540

46-
final var queryFunction = nullToEmptySource(ruleMapperSource).apply(ruleMapperContext);
41+
final var queryFunction = nullToEmptySource(ruleMapperSource).apply(ctx);
4742

48-
return entityList ->
49-
then(translate(entityList, ruleMapperContext.topLevelCollectionFactory()), entities ->
50-
safeApply(entities, queryFunction)
51-
.collect(ruleMapperContext.mapCollector().apply(entities.size()))
52-
.map(map -> toResultMap(entities, map, ruleMapperContext.topLevelIdResolver(), ruleMapperContext.defaultResultProvider())));
43+
return entityList -> {
44+
var entities = translate(entityList, ctx.topLevelCollectionFactory());
45+
46+
return safeApply(entities, queryFunction)
47+
.collect(ctx.mapCollector().apply(entities.size()))
48+
.map(map -> toResultMap(entities, map, ctx.outerIdResolver(), ctx.defaultResultProvider()));
49+
};
5350
}
5451

5552
static <T, TC extends Collection<T>, R> Function<TC, Publisher<R>> toPublisher(Function<TC, Iterable<R>> queryFunction) {
@@ -79,8 +76,10 @@ static <T, ID, RRC> Map<ID, RRC> toResultMap(
7976

8077
static <ID, RRC> Map<ID, RRC> initializeResultMap(Collection<ID> ids, Map<ID, RRC> resultMap, Function<ID, RRC> defaultResultProvider) {
8178
final Function<ID, RRC> resultProvider = requireNonNullElse(defaultResultProvider, id -> null);
82-
final Set<ID> idsFromQueryResult = resultMap.keySet();
83-
final Map<ID, RRC> resultMapCopy = new HashMap<>(resultMap);
79+
80+
final Map<ID, RRC> resultLinkedHashMap = toLinkedHashMap(resultMap);
81+
final Set<ID> idsFromQueryResult = resultLinkedHashMap.keySet();
82+
final Map<ID, RRC> resultMapCopy = new LinkedHashMap<>(resultLinkedHashMap);
8483

8584
// defaultResultProvider can provide a null value, so we cannot use a Collector here
8685
// as it would throw a NullPointerException

0 commit comments

Comments
 (0)