Skip to content

Commit 14ade95

Browse files
alex-spieselasticsearchmachine
andauthored
[9.0] ESQL: Disallow remote enrich after lu join (#131426) (#131534)
* ESQL: Disallow remote enrich after lu join (#131426) Fix #129372 Due to how remote ENRICH is [planned](https://github.com/elastic/elasticsearch/blob/32e50d0d94e27ee559d24bf9d5463ba6e64d1788/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java#L93), it interacts in special ways with pipeline breakers, in particular LIMIT and TopN; when these are encountered upstream from a remote ENRICH, these nodes are copied and executed a second time after the remote ENRICH. We'd like to allow remote ENRICH after LOOKUP JOIN, but that forces the lookup to be remote as well; this has its own interactions with pipeline breakers: in particular, LIMITs and TopNs cannot just be duplicated after LOOKUP JOIN, as LOOKUP JOIN may add new rows. For now, let's just forbid any usage of remote ENRICH after LOOKUP JOINs; remote ENRICH is mostly relevant for CCS, and LOOKUP JOIN doesn't support that in 9.1/8.19, anyway. There is separate work that enables remote LOOKUP JOINs on remote clusters and adds the correct validations; we can later build support for remote ENRICH + LOOKUP JOIN on top of that. (C.f. my comment [here](#129372 (comment)) and my draft #131286 for enabling this.) (cherry picked from commit 06e39c0) # Conflicts: # x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 9605b05 commit 14ade95

File tree

7 files changed

+422
-36
lines changed

7 files changed

+422
-36
lines changed

docs/changelog/131426.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 131426
2+
summary: Disallow remote enrich after lu join
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 129372

x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,3 +661,104 @@ from *
661661
author.keyword:keyword|book_no:keyword|scalerank:integer|street:keyword|bytes_in:ul|@timestamp:unsupported|abbrev:keyword|city_location:geo_point|distance:double|description:unsupported|birth_date:date|language_code:integer|intersects:boolean|client_ip:unsupported|event_duration:long|version:version|language_name:keyword
662662
Fyodor Dostoevsky |1211 |null |null |null |null |null |null |null |null |null |null |null |null |null |null |null
663663
;
664+
665+
666+
statsAfterRemoteEnrich
667+
required_capability: enrich_load
668+
669+
FROM sample_data
670+
| KEEP message
671+
| WHERE message IN ("Connected to 10.1.0.1", "Connected to 10.1.0.2")
672+
| EVAL language_code = "1"
673+
| ENRICH _remote:languages_policy ON language_code
674+
| STATS messages = count_distinct(message) BY language_name
675+
;
676+
677+
messages:long | language_name:keyword
678+
2 | English
679+
;
680+
681+
682+
enrichAfterRemoteEnrich
683+
required_capability: enrich_load
684+
685+
FROM sample_data
686+
| KEEP message
687+
| WHERE message IN ("Connected to 10.1.0.1")
688+
| EVAL language_code = "1"
689+
| ENRICH _remote:languages_policy ON language_code
690+
| RENAME language_name AS first_language_name
691+
| ENRICH languages_policy ON language_code
692+
;
693+
694+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
695+
Connected to 10.1.0.1 | 1 | English | English
696+
;
697+
698+
699+
coordinatorEnrichAfterRemoteEnrich
700+
required_capability: enrich_load
701+
702+
FROM sample_data
703+
| KEEP message
704+
| WHERE message IN ("Connected to 10.1.0.1")
705+
| EVAL language_code = "1"
706+
| ENRICH _remote:languages_policy ON language_code
707+
| RENAME language_name AS first_language_name
708+
| ENRICH _coordinator:languages_policy ON language_code
709+
;
710+
711+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
712+
Connected to 10.1.0.1 | 1 | English | English
713+
;
714+
715+
716+
doubleRemoteEnrich
717+
required_capability: enrich_load
718+
719+
FROM sample_data
720+
| KEEP message
721+
| WHERE message IN ("Connected to 10.1.0.1")
722+
| EVAL language_code = "1"
723+
| ENRICH _remote:languages_policy ON language_code
724+
| RENAME language_name AS first_language_name
725+
| ENRICH _remote:languages_policy ON language_code
726+
;
727+
728+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
729+
Connected to 10.1.0.1 | 1 | English | English
730+
;
731+
732+
733+
enrichAfterCoordinatorEnrich
734+
required_capability: enrich_load
735+
736+
FROM sample_data
737+
| KEEP message
738+
| WHERE message IN ("Connected to 10.1.0.1")
739+
| EVAL language_code = "1"
740+
| ENRICH _coordinator:languages_policy ON language_code
741+
| RENAME language_name AS first_language_name
742+
| ENRICH languages_policy ON language_code
743+
;
744+
745+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
746+
Connected to 10.1.0.1 | 1 | English | English
747+
;
748+
749+
750+
doubleCoordinatorEnrich
751+
required_capability: enrich_load
752+
753+
FROM sample_data
754+
| KEEP message
755+
| WHERE message IN ("Connected to 10.1.0.1")
756+
| EVAL language_code = "1"
757+
| ENRICH _coordinator:languages_policy ON language_code
758+
| RENAME language_name AS first_language_name
759+
| ENRICH _coordinator:languages_policy ON language_code
760+
;
761+
762+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
763+
Connected to 10.1.0.1 | 1 | English | English
764+
;

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1742,3 +1742,101 @@ FROM sample_data_ts_nanos
17421742
2023-10-23T12:27:28.948123456Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2
17431743
2023-10-23T12:15:03.360123456Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3
17441744
;
1745+
1746+
###############################################
1747+
# LOOKUP JOIN and ENRICH
1748+
###############################################
1749+
1750+
enrichAfterLookupJoin
1751+
required_capability: join_lookup_v12
1752+
1753+
FROM sample_data
1754+
| KEEP message
1755+
| WHERE message == "Connected to 10.1.0.1"
1756+
| EVAL language_code = "1"
1757+
| LOOKUP JOIN message_types_lookup ON message
1758+
| ENRICH languages_policy ON language_code
1759+
;
1760+
1761+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
1762+
Connected to 10.1.0.1 | 1 | Success | English
1763+
;
1764+
1765+
1766+
lookupJoinAfterEnrich
1767+
required_capability: join_lookup_v12
1768+
1769+
FROM sample_data
1770+
| KEEP message
1771+
| WHERE message == "Connected to 10.1.0.1"
1772+
| EVAL language_code = "1"
1773+
| ENRICH languages_policy ON language_code
1774+
| LOOKUP JOIN message_types_lookup ON message
1775+
;
1776+
1777+
message:keyword | language_code:keyword | language_name:keyword | type:keyword
1778+
Connected to 10.1.0.1 | 1 | English | Success
1779+
;
1780+
1781+
1782+
lookupJoinAfterRemoteEnrich
1783+
required_capability: join_lookup_v12
1784+
1785+
FROM sample_data
1786+
| KEEP message
1787+
| WHERE message == "Connected to 10.1.0.1"
1788+
| EVAL language_code = "1"
1789+
| ENRICH _remote:languages_policy ON language_code
1790+
| LOOKUP JOIN message_types_lookup ON message
1791+
;
1792+
1793+
message:keyword | language_code:keyword | language_name:keyword | type:keyword
1794+
Connected to 10.1.0.1 | 1 | English | Success
1795+
;
1796+
1797+
1798+
lookupJoinAfterLimitAndRemoteEnrich
1799+
required_capability: join_lookup_v12
1800+
1801+
FROM sample_data
1802+
| KEEP message
1803+
| WHERE message == "Connected to 10.1.0.1"
1804+
| EVAL language_code = "1"
1805+
| LIMIT 1
1806+
| ENRICH _remote:languages_policy ON language_code
1807+
| EVAL enrich_language_name = language_name, language_code = language_code::integer
1808+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
1809+
| KEEP message, enrich_language_name, language_name, country.keyword
1810+
| SORT language_name, country.keyword
1811+
;
1812+
1813+
message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
1814+
Connected to 10.1.0.1 | English | English | Canada
1815+
Connected to 10.1.0.1 | English | English | United States of America
1816+
Connected to 10.1.0.1 | English | English | null
1817+
Connected to 10.1.0.1 | English | null | United Kingdom
1818+
;
1819+
1820+
1821+
lookupJoinAfterTopNAndRemoteEnrich
1822+
required_capability: join_lookup_v12
1823+
1824+
FROM sample_data
1825+
| KEEP message
1826+
| WHERE message == "Connected to 10.1.0.1"
1827+
| EVAL language_code = "1"
1828+
| SORT message
1829+
| LIMIT 1
1830+
| ENRICH _remote:languages_policy ON language_code
1831+
| EVAL enrich_language_name = language_name, language_code = language_code::integer
1832+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
1833+
| KEEP message, enrich_language_name, language_name, country.keyword
1834+
| SORT language_name, country.keyword
1835+
;
1836+
1837+
message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
1838+
Connected to 10.1.0.1 | English | English | Canada
1839+
Connected to 10.1.0.1 | English | English | United States of America
1840+
Connected to 10.1.0.1 | English | English | null
1841+
Connected to 10.1.0.1 | English | null | United Kingdom
1842+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.xpack.esql.index.EsIndex;
3636
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
3737
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
38+
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
3839

3940
import java.io.IOException;
4041
import java.util.ArrayList;
@@ -295,23 +296,43 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
295296
* retaining the originating cluster and restructing pages for routing, which might be complicated.
296297
*/
297298
private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) {
298-
boolean[] agg = { false };
299-
boolean[] enrichCoord = { false };
299+
// First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
300+
// in separate FORK branches which are valid by themselves.
301+
plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures));
302+
}
303+
304+
/**
305+
* For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
306+
*/
307+
private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) {
308+
if (enrich.mode != Mode.REMOTE) {
309+
return;
310+
}
311+
312+
// TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only.
313+
// https://github.com/elastic/elasticsearch/issues/131445
314+
boolean[] aggregate = { false };
315+
boolean[] coordinatorOnlyEnrich = { false };
316+
boolean[] lookupJoin = { false };
300317

301-
plan.forEachUp(UnaryPlan.class, u -> {
318+
enrich.forEachUp(LogicalPlan.class, u -> {
302319
if (u instanceof Aggregate) {
303-
agg[0] = true;
304-
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
305-
enrichCoord[0] = true;
306-
}
307-
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
308-
if (agg[0]) {
309-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
310-
}
311-
if (enrichCoord[0]) {
312-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
313-
}
320+
aggregate[0] = true;
321+
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
322+
coordinatorOnlyEnrich[0] = true;
323+
} else if (u instanceof LookupJoin) {
324+
lookupJoin[0] = true;
314325
}
315326
});
327+
328+
if (aggregate[0]) {
329+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
330+
}
331+
if (coordinatorOnlyEnrich[0]) {
332+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
333+
}
334+
if (lookupJoin[0]) {
335+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
336+
}
316337
}
317338
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
7878
PhysicalPlan mappedChild = map(unary.child());
7979

8080
//
81-
// TODO - this is hard to follow and needs reworking
81+
// TODO - this is hard to follow, causes bugs and needs reworking
8282
// https://github.com/elastic/elasticsearch/issues/115897
8383
//
8484
if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
2828
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
2929
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE;
30+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
3031
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
3132
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
3233

@@ -51,36 +52,37 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map<String, Ind
5152
}
5253

5354
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
54-
return new Analyzer(
55-
new AnalyzerContext(
56-
EsqlTestUtils.TEST_CFG,
57-
new EsqlFunctionRegistry(),
58-
indexResolution,
59-
defaultLookupResolution(),
60-
defaultEnrichResolution()
61-
),
62-
verifier
63-
);
55+
return analyzer(indexResolution, defaultLookupResolution(), verifier);
6456
}
6557

6658
public static Analyzer analyzer(IndexResolution indexResolution, Map<String, IndexResolution> lookupResolution, Verifier verifier) {
59+
return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier);
60+
}
61+
62+
public static Analyzer analyzer(
63+
IndexResolution indexResolution,
64+
Map<String, IndexResolution> lookupResolution,
65+
EnrichResolution enrichResolution,
66+
Verifier verifier
67+
) {
68+
return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG);
69+
}
70+
71+
public static Analyzer analyzer(
72+
IndexResolution indexResolution,
73+
Map<String, IndexResolution> lookupResolution,
74+
EnrichResolution enrichResolution,
75+
Verifier verifier,
76+
Configuration config
77+
) {
6778
return new Analyzer(
68-
new AnalyzerContext(
69-
EsqlTestUtils.TEST_CFG,
70-
new EsqlFunctionRegistry(),
71-
indexResolution,
72-
lookupResolution,
73-
defaultEnrichResolution()
74-
),
79+
new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, lookupResolution, enrichResolution),
7580
verifier
7681
);
7782
}
7883

7984
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
80-
return new Analyzer(
81-
new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, defaultLookupResolution(), defaultEnrichResolution()),
82-
verifier
83-
);
85+
return analyzer(indexResolution, defaultLookupResolution(), defaultEnrichResolution(), verifier, config);
8486
}
8587

8688
public static Analyzer analyzer(Verifier verifier) {
@@ -180,6 +182,25 @@ public static void loadEnrichPolicyResolution(
180182
);
181183
}
182184

185+
public static void loadEnrichPolicyResolution(
186+
EnrichResolution enrich,
187+
Enrich.Mode mode,
188+
String policyType,
189+
String policy,
190+
String field,
191+
String index,
192+
String mapping
193+
) {
194+
IndexResolution indexResolution = loadMapping(mapping, index);
195+
List<String> enrichFields = new ArrayList<>(indexResolution.get().mapping().keySet());
196+
enrichFields.remove(field);
197+
enrich.addResolvedPolicy(
198+
policy,
199+
mode,
200+
new ResolvedEnrichPolicy(field, policyType, enrichFields, Map.of("", index), indexResolution.get().mapping())
201+
);
202+
}
203+
183204
public static void loadEnrichPolicyResolution(EnrichResolution enrich, String policy, String field, String index, String mapping) {
184205
loadEnrichPolicyResolution(enrich, EnrichPolicy.MATCH_TYPE, policy, field, index, mapping);
185206
}

0 commit comments

Comments
 (0)