Skip to content

Improve Expanding Lookup Join performance by pushing a filter to the right side of the lookup join #132889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from

Conversation

julian-elastic
Copy link
Contributor

@julian-elastic julian-elastic commented Aug 13, 2025

Improve Expanding Lookup Join performance by pushing a filter to the right side of the lookup join.

As this is a performance optimization, we don't want to break the behavior for old nodes for CSS. The filter that we push down is optional and it is always reapplied after the lookup join. As a result if all nodes involved are new we will get performance benefits. Otherwise there might be partial or no performance benefits, but we will still execute the query successfully and get correct results if the query worked before this optimization.

Preliminary results indicate around 90x improvement with the optimization for Lucene pushable filters on a test case that is specifically designed to demonstrate the benefits of this optimization. Customers are likely to see more limited benefits. The test case is an expanding lookup join of 100,000 rows table with 10,000 lookup table with filter of selectivity 0.1% (keeps 10 out of 10,000 rows of the lookup table). In the non-optimized version the filter is not pushed to the right, and we can get an explosion of records. We have 100,000 x10,000 = 1,000,000,000 rows after the join without the optimization. Then we filter then out to only 1,000,000 rows. With the optimization we apply the filter early so after the expanding join we only have 1,000,000 rows. This reduced max number of rows used by a factor of 1,000 and made the query 90 times faster.

Right Pushable filters with optimization
Running filtered join query...
test pushable join with filter on keyword: {"took":125,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":124,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":124,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":121,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":134,"documents_found":100000,"values":[[1000000]]}

Right Pushable filters without optimization
Running filtered join query...
test pushable join with filter on keyword: {"took":11315,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":11348,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":11330,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":11271,"documents_found":100000,"values":[[1000000]]}
test pushable join with filter on keyword: {"took":11258,"documents_found":100000,"values":[[1000000]]}

Script

#!/bin/bash

passwd="redacted"

# Cleanup and create test_left index
curl -sk -uelastic:$passwd -HContent-Type:application/json -XDELETE http://localhost:9200/test_left
curl -sk -uelastic:$passwd -HContent-Type:application/json -XPUT http://localhost:9200/test_left -d'{
    "settings": {
        "index.refresh_interval": -1
    },
    "mappings": {
        "properties": {
            "join_key": { "type": "keyword" },
            "value_left": { "type": "keyword" }
        }
    }
}'

# Cleanup and create test_right index
curl -sk -uelastic:$passwd -HContent-Type:application/json -XDELETE http://localhost:9200/test_right
curl -sk -uelastic:$passwd -HContent-Type:application/json -XPUT http://localhost:9200/test_right -d'{
    "settings": {
        "index.refresh_interval": -1,
        "index.mode": "lookup"
    },
    "mappings": {
        "properties": {
            "join_key": { "type": "keyword" },
            "filter_field_kw": { "type": "keyword" },
            "filter_field_int": { "type": "integer" }
        }
    }
}'

# Populate test_left with 100,000 documents
echo "Populating test_left..."
counter=1
for a in {1..100}; do
    rm -f /tmp/bulk_left
    for b in {1..1000}; do
        echo '{"index":{"_index":"test_left"}}' >> /tmp/bulk_left
        printf '{"join_key":"A", "value_left":"text %d"}\n' "$counter" >> /tmp/bulk_left
        counter=$((counter + 1))
    done
    printf "test_left: batch %02d/100 " $a
    response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST http://localhost:9200/_bulk?pretty --data-binary @/tmp/bulk_left)
    error=$(echo "$response" | jq -c '.errors')
    if [ "$error" != "false" ]; then
        echo "$response" | jq -c '.items[] | select(.index.error != null) | .index.error'
    else
        echo "OK"
    fi
done

# Populate test_right with 10,000 documents
echo "Populating test_right..."
# Batch 1: 10 'match' and 990 'no_match'
rm -f /tmp/bulk_right
for i in {1..10}; do
    echo '{"index":{"_index":"test_right"}}' >> /tmp/bulk_right
    printf '{"join_key":"A", "filter_field_kw":"match", "filter_field_int":1}\n' >> /tmp/bulk_right
done
for i in {1..990}; do
    echo '{"index":{"_index":"test_right"}}' >> /tmp/bulk_right
    printf '{"join_key":"A", "filter_field_kw":"no_match", "filter_field_int":2}\n' >> /tmp/bulk_right
done
printf "test_right: batch 01/10 "
response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST http://localhost:9200/_bulk?pretty --data-binary @/tmp/bulk_right)
error=$(echo "$response" | jq -c '.errors')
if [ "$error" != "false" ]; then
    echo "$response" | jq -c '.items[] | select(.index.error != null) | .index.error'
else
    echo "OK"
fi

# Batches 2-10: 1000 'no_match' each
for a in {2..10}; do
    rm -f /tmp/bulk_right
    for b in {1..1000}; do
        echo '{"index":{"_index":"test_right"}}' >> /tmp/bulk_right
        printf '{"join_key":"A", "filter_field_kw":"no_match", "filter_field_int":2}\n' >> /tmp/bulk_right
    done
    printf "test_right: batch %02d/10 " $a
    response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST http://localhost:9200/_bulk?pretty --data-binary @/tmp/bulk_right)
    error=$(echo "$response" | jq -c '.errors')
    if [ "$error" != "false" ]; then
        echo "$response" | jq -c '.items[] | select(.index.error != null) | .index.error'
    else
        echo "OK"
    fi
done



# Force merge and refresh both indices
curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST http://localhost:9200/test_left,test_right/_forcemerge?max_num_segments=1
curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST http://localhost:9200/test_left,test_right/_refresh
echo
curl -sk -uelastic:$passwd http://localhost:9200/_cat/indices?v

test_join_with_filter_pushable_keyword() {
    echo -n "test pushable join with filter on keyword: "
    response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST 'http://localhost:9200/_query?pretty' -d'{ "query": "FROM test_left | LOOKUP JOIN test_right ON join_key | WHERE filter_field_kw == \"match\" | STATS count=COUNT(*)", "pragma": { "data_partitioning": "shard" } }')
    error=$(echo "$response" | jq -c '.error')
    if [ "$error" != "null" ]; then
        echo "$error"
    else
        echo "$response" | jq -c '{took, documents_found, values}'
    fi
}

test_join_with_filter_pushable_int() {
    echo -n "test pushable join with filter on int: "
    response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST 'http://localhost:9200/_query?pretty' -d'{ "query": "FROM test_left | LOOKUP JOIN test_right ON join_key | WHERE filter_field_int == 1 | STATS count=COUNT(*)", "pragma": { "data_partitioning": "shard" } }')
    error=$(echo "$response" | jq -c '.error')
    if [ "$error" != "null" ]; then
        echo "$error"
    else
        echo "$response" | jq -c '{took, documents_found, values}'
    fi
}

test_join_with_filter_non_pushable_int() {
    echo -n "test non-pushable join with filter on int: "
    response=$(curl -sk -uelastic:$passwd -HContent-Type:application/json -XPOST 'http://localhost:9200/_query?pretty' -d'{ "query": "FROM test_left | LOOKUP JOIN test_right ON join_key | WHERE ABS(filter_field_int) == 1 | STATS count=COUNT(*)", "pragma": { "data_partitioning": "shard" } }')
    error=$(echo "$response" | jq -c '.error')
    if [ "$error" != "null" ]; then
        echo "$error"
    else
        echo "$response" | jq -c '{took, documents_found, values}'
    fi
}

echo
echo "Running filtered join query..."
##for a in {1..5}; do
 ##   test_join_with_filter
##done
for a in {1..5}; do
    test_join_with_filter_pushable_keyword
done

);
builder.append(
new FieldAttribute(Source.EMPTY, "Positions", new EsField("Positions", DataType.INTEGER, Collections.emptyMap(), false))
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000
FilterOperator.FilterOperatorFactory needs an ExpressionEvaluator.Factory which needs a Layout.

How do I build a Layout here?
I attached 2 columns from the EnrichQuerySourceOperator and then whatever else we have in request.extractFields. It seems to work, because we don't refer to the first 2 columns.

EnrichQuerySourceOperator says there are 2 columns so I added them, but not sure what I have is correct. In particular the Docs column should be a vector, right? But there is no vector datatype.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These come from LocalExecutionPlanner usually.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I suppose you could copy stuff from there. Or use it somehow.

}

var evaluatorFactory = EvalMapper.toEvaluator(
FoldContext.small()/*is this correct*/,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 Should I use FoldContext.small() here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This'll be used when folding arguments - though maybe we'd never need any memory because we've already folded to literals. Imagine:

HASH(v, "md5") == "whatever"

The "md5" gets folded.

@julian-elastic julian-elastic self-assigned this Aug 13, 2025
@julian-elastic julian-elastic added :Analytics/ES|QL AKA ESQL :Performance All issues related to Elasticsearch performance including regressions and investigations >enhancement Team:ES|QL labels Aug 13, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @julian-elastic, I've created a changelog YAML for you.

@julian-elastic julian-elastic added Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) and removed Team:ES|QL labels Aug 14, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @julian-elastic, I've created a changelog YAML for you.

@julian-elastic julian-elastic removed the :Performance All issues related to Elasticsearch performance including regressions and investigations label Aug 16, 2025
} else {
inputOperator = queryOperator;
}
Operator postJoinFilter = filterExecOperator(filterExec, inputOperator, shardContext.context, driverContext, builder);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 Is this how I need to build the filter? It does not seem to be working, I am debugging but can use some help if you spot any issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems sane.

@julian-elastic julian-elastic changed the title Improve Expanding Lookup Join performance by pushing a filter to the lookup join Improve Expanding Lookup Join performance by pushing a filter to the right side of the lookup join Aug 16, 2025
@julian-elastic
Copy link
Contributor Author

To be merged with #133166.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.2.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants