diff --git a/docs/changelog/131286.yaml b/docs/changelog/131286.yaml new file mode 100644 index 0000000000000..ce1fc56860fc5 --- /dev/null +++ b/docs/changelog/131286.yaml @@ -0,0 +1,5 @@ +pr: 131286 +summary: Allow remote enrich after LOOKUP JOIN +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index bdf0413a03d02..77ad020929fc4 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1875,6 +1875,76 @@ type:keyword | language_code:integer | language_name:keyword Production | 3 | Spanish ; +enrichAfterLookupJoin +required_capability: join_lookup_v12 + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| ENRICH languages_policy ON language_code +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +; + +remoteEnrichAfterLookupJoin +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +# TODO: a bunch more tests, also switch orders, use double _remote enrich, double lookup join etc. Also add tests with +# _coordinator enrich. What about ROW? + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| ENRICH _remote:languages_policy ON language_code +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +; + +remoteEnrichSortAfterLookupJoin +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| ENRICH _remote:languages_policy ON language_code +| SORT message ASC +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +Connected to 10.1.0.2 | 1 | Success | English +; + +sortRemoteEnrichAfterLookupJoin +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| SORT message ASC +| ENRICH _remote:languages_policy ON language_code +| LIMIT 2 +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +Connected to 10.1.0.2 | 1 | Success | English +; ############################################### # LOOKUP JOIN on mixed numerical fields diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 815b08409723c..cea8bbefc0a5c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -117,7 +117,7 @@ public void testEnrichAfterStop() throws Exception { SimplePauseFieldPlugin.allowEmitting.countDown(); try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) { - // Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent + // Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent // because we stopped it before processing the data assertThat( getValuesList(resp), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 4759579b94d24..618369704d4d5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1205,6 +1205,12 @@ public enum Cap { */ ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()), + /** + * Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN}, + * see java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH + */ + REMOTE_ENRICH_AFTER_LOOKUP_JOIN, + /** * MATCH PHRASE function */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index 9e232bd8f02f9..7c40b4e342b53 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -463,7 +463,7 @@ public LogicalPlan visitShowInfo(EsqlBaseParser.ShowInfoContext ctx) { @Override public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) { - return p -> { + return child -> { var source = source(ctx); Tuple tuple = parsePolicyName(ctx.policyName); Mode mode = tuple.v1(); @@ -482,9 +482,15 @@ public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) { } List keepClauses = visitList(this, ctx.enrichWithClause(), NamedExpression.class); + + // If this is a remote-only ENRICH, any upstream LOOKUP JOINs need to be treated as remote-only, too. + LogicalPlan updatedChild = (mode == Mode.REMOTE) == false + ? child + : child.transformDown(LookupJoin.class, lj -> new LookupJoin(lj.source(), lj.left(), lj.right(), lj.config(), true)); + return new Enrich( source, - p, + updatedChild, mode, Literal.keyword(source(ctx.policyName), policyNameString), matchField, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 4d1d65d63932d..f9aa26b0987de 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -87,7 +87,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { PhysicalPlan mappedChild = map(unary.child()); // - // TODO - this is hard to follow and needs reworking + // TODO - this is hard to follow, causes bugs and needs reworking // https://github.com/elastic/elasticsearch/issues/115897 // if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { @@ -102,6 +102,8 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway). Holder hasFragment = new Holder<>(false); + // Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice; + // include the plan up until this ENRICH in the fragment. var childTransformed = mappedChild.transformUp(f -> { // Once we reached FragmentExec, we stuff our Enrich under it if (f instanceof FragmentExec) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java index cbb825ca9581b..fbfa18dccc477 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java @@ -36,9 +36,9 @@ import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE; import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE; import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration; -import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; public final class AnalyzerTestUtils { @@ -61,27 +61,36 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map lookupResolution, Verifier verifier) { + return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier); + } + + public static Analyzer analyzer( + IndexResolution indexResolution, + Map lookupResolution, + EnrichResolution enrichResolution, + Verifier verifier + ) { + return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG); + } + + public static Analyzer analyzer( + IndexResolution indexResolution, + Map lookupResolution, + EnrichResolution enrichResolution, + Verifier verifier, + Configuration config + ) { return new Analyzer( new AnalyzerContext( - EsqlTestUtils.TEST_CFG, + config, new EsqlFunctionRegistry(), indexResolution, lookupResolution, - defaultEnrichResolution(), + enrichResolution, defaultInferenceResolution() ), verifier @@ -89,17 +98,7 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map query("FROM test,remote:test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code") ); assertThat(e.getMessage(), containsString("remote clusters are not supported with LOOKUP JOIN")); - } private void checkFullTextFunctionsInStats(String functionInvocation) {