Skip to content

ESQL: Allow remote enrich after LOOKUP JOIN #131286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131286.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131286
summary: Allow remote enrich after LOOKUP JOIN
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,76 @@ type:keyword | language_code:integer | language_name:keyword
Production | 3 | Spanish
;

enrichAfterLookupJoin
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH languages_policy ON language_code
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
;

remoteEnrichAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

# TODO: a bunch more tests, also switch orders, use double _remote enrich, double lookup join etc. Also add tests with
# _coordinator enrich. What about ROW?

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH _remote:languages_policy ON language_code
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
;

remoteEnrichSortAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH _remote:languages_policy ON language_code
| SORT message ASC
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
Connected to 10.1.0.2 | 1 | Success | English
;

sortRemoteEnrichAfterLookupJoin
required_capability: join_lookup_v12
required_capability: remote_enrich_after_lookup_join

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| SORT message ASC
| ENRICH _remote:languages_policy ON language_code
| LIMIT 2
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
Connected to 10.1.0.2 | 1 | Success | English
;

###############################################
# LOOKUP JOIN on mixed numerical fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testEnrichAfterStop() throws Exception {
SimplePauseFieldPlugin.allowEmitting.countDown();

try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) {
// Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
// Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. There's one more in CrossClusterEnrichUnavailableClustersIT btw if we already fixing it :)

// because we stopped it before processing the data
assertThat(
getValuesList(resp),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,12 @@ public enum Cap {
*/
ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()),

/**
* Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN},
* see <a href="https://github.com/elastic/elasticsearch/issues/129372">java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH</a>
*/
REMOTE_ENRICH_AFTER_LOOKUP_JOIN,

/**
* MATCH PHRASE function
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public LogicalPlan visitShowInfo(EsqlBaseParser.ShowInfoContext ctx) {

@Override
public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
return p -> {
return child -> {
var source = source(ctx);
Tuple<Mode, String> tuple = parsePolicyName(ctx.policyName);
Mode mode = tuple.v1();
Expand All @@ -482,9 +482,15 @@ public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
}

List<NamedExpression> keepClauses = visitList(this, ctx.enrichWithClause(), NamedExpression.class);

// If this is a remote-only ENRICH, any upstream LOOKUP JOINs need to be treated as remote-only, too.
LogicalPlan updatedChild = (mode == Mode.REMOTE) == false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
Maybe we should flip the condition to avoid (==) == false?

? child
: child.transformDown(LookupJoin.class, lj -> new LookupJoin(lj.source(), lj.left(), lj.right(), lj.config(), true));

return new Enrich(
source,
p,
updatedChild,
mode,
Literal.keyword(source(ctx.policyName), policyNameString),
matchField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
PhysicalPlan mappedChild = map(unary.child());

//
// TODO - this is hard to follow and needs reworking
// TODO - this is hard to follow, causes bugs and needs reworking
// https://github.com/elastic/elasticsearch/issues/115897
//
if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
Expand All @@ -102,6 +102,8 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
Holder<Boolean> hasFragment = new Holder<>(false);

// Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice;
// include the plan up until this ENRICH in the fragment.
var childTransformed = mappedChild.transformUp(f -> {
// Once we reached FragmentExec, we stuff our Enrich under it
if (f instanceof FragmentExec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution;

public final class AnalyzerTestUtils {

Expand All @@ -61,45 +61,44 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map<String, Ind
}

public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
return new Analyzer(
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
indexResolution,
defaultLookupResolution(),
defaultEnrichResolution(),
emptyInferenceResolution()
),
verifier
);
return analyzer(indexResolution, defaultLookupResolution(), verifier);
}

public static Analyzer analyzer(IndexResolution indexResolution, Map<String, IndexResolution> lookupResolution, Verifier verifier) {
return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier);
}

public static Analyzer analyzer(
IndexResolution indexResolution,
Map<String, IndexResolution> lookupResolution,
EnrichResolution enrichResolution,
Verifier verifier
) {
return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG);
}

public static Analyzer analyzer(
IndexResolution indexResolution,
Map<String, IndexResolution> lookupResolution,
EnrichResolution enrichResolution,
Verifier verifier,
Configuration config
) {
return new Analyzer(
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
config,
new EsqlFunctionRegistry(),
indexResolution,
lookupResolution,
defaultEnrichResolution(),
enrichResolution,
defaultInferenceResolution()
),
verifier
);
}

public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
return new Analyzer(
new AnalyzerContext(
config,
new EsqlFunctionRegistry(),
indexResolution,
defaultLookupResolution(),
defaultEnrichResolution(),
defaultInferenceResolution()
),
verifier
);
return analyzer(indexResolution, defaultLookupResolution(), defaultEnrichResolution(), verifier, config);
}

public static Analyzer analyzer(Verifier verifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.parser.QueryParam;
import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -38,9 +39,13 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.paramAsConstant;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadEnrichPolicyResolution;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT;
Expand Down Expand Up @@ -2268,6 +2273,78 @@ public void testRemoteLookupJoinWithPipelineBreaker() {
);
}

public void testRemoteEnrichAfterLookupJoinWithPipelineBreaker() {
EnrichResolution enrichResolution = new EnrichResolution();
loadEnrichPolicyResolution(
enrichResolution,
Enrich.Mode.REMOTE,
MATCH_TYPE,
"languages",
"language_code",
"languages_idx",
"mapping-languages.json"
);
loadEnrichPolicyResolution(
enrichResolution,
Enrich.Mode.COORDINATOR,
MATCH_TYPE,
"languages_coord",
"language_code",
"languages_idx",
"mapping-languages.json"
);
var analyzer = AnalyzerTestUtils.analyzer(
loadMapping("mapping-default.json", "test"),
defaultLookupResolution(),
enrichResolution,
TEST_VERIFIER
);

String err = error("""
FROM test
| STATS c = COUNT(*) by languages
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(
err,
containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [STATS c = COUNT(*) by languages]@2:3")
);
assertThat(err, containsString("5:3: ENRICH with remote policy can't be executed after STATS"));

err = error("""
FROM test
| SORT emp_no
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [SORT emp_no]@2:3"));

err = error("""
FROM test
| LIMIT 2
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [LIMIT 2]@2:3"));

err = error("""
FROM test
| EVAL language_code = languages
| ENRICH _coordinator:languages_coord
| LOOKUP JOIN languages_lookup ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(
err,
containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [ENRICH _coordinator:languages_coord]@3:3")
);
assertThat(err, containsString("5:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
}

public void testRemoteLookupJoinIsSnapshot() {
// TODO: remove when we allow remote joins in release builds
assumeTrue("Remote LOOKUP JOIN not enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled());
Expand All @@ -2282,7 +2359,6 @@ public void testRemoteLookupJoinIsDisabled() {
() -> query("FROM test,remote:test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code")
);
assertThat(e.getMessage(), containsString("remote clusters are not supported with LOOKUP JOIN"));

}

private void checkFullTextFunctionsInStats(String functionInvocation) {
Expand Down
Loading