Skip to content

Commit ed6946b

Browse files
Enhance LookupFromIndexIT
1 parent f742160 commit ed6946b

File tree

1 file changed

+128
-42
lines changed

1 file changed

+128
-42
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 128 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -82,71 +82,138 @@
8282

8383
public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
8484
public void testKeywordKey() throws IOException {
85-
runLookup(DataType.KEYWORD, new UsingSingleLookupTable(new String[] { "aa", "bb", "cc", "dd" }));
85+
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" } }));
86+
}
87+
88+
public void testJoinOnTwoKeys() throws IOException {
89+
runLookup(
90+
List.of(DataType.KEYWORD, DataType.LONG),
91+
new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "cc", "dd" }, new Long[] { 12L, 33L, 1L, 42L } })
92+
);
93+
}
94+
95+
public void testJoinOnThreeKeys() throws IOException {
96+
runLookup(
97+
List.of(DataType.KEYWORD, DataType.LONG, DataType.KEYWORD),
98+
new UsingSingleLookupTable(
99+
new Object[][] {
100+
new String[] { "aa", "bb", "cc", "dd" },
101+
new Long[] { 12L, 33L, 1L, 42L },
102+
new String[] { "one", "two", "three", "four" }, }
103+
)
104+
);
105+
}
106+
107+
public void testJoinOnFourKeys() throws IOException {
108+
runLookup(
109+
List.of(DataType.KEYWORD, DataType.LONG, DataType.KEYWORD, DataType.INTEGER),
110+
new UsingSingleLookupTable(
111+
new Object[][] {
112+
new String[] { "aa", "bb", "cc", "dd" },
113+
new Long[] { 12L, 33L, 1L, 42L },
114+
new String[] { "one", "two", "three", "four" },
115+
new Integer[] { 1, 2, 3, 4 }, }
116+
)
117+
);
86118
}
87119

88120
public void testLongKey() throws IOException {
89-
runLookup(DataType.LONG, new UsingSingleLookupTable(new Long[] { 12L, 33L, 1L }));
121+
runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }));
90122
}
91123

92124
/**
93125
* LOOKUP multiple results match.
94126
*/
95127
public void testLookupIndexMultiResults() throws IOException {
96-
runLookup(DataType.KEYWORD, new UsingSingleLookupTable(new String[] { "aa", "bb", "bb", "dd" }));
128+
runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }));
97129
}
98130

99131
interface PopulateIndices {
100132
void populate(int docCount, List<String> expected) throws IOException;
101133
}
102134

103135
class UsingSingleLookupTable implements PopulateIndices {
104-
private final Map<Object, List<Integer>> matches = new HashMap<>();
105-
private final Object[] lookupData;
136+
private final Map<List<Object>, List<Integer>> matches = new HashMap<>();
137+
private final Object[][] lookupData;
106138

107-
UsingSingleLookupTable(Object[] lookupData) {
139+
// Accepts array of arrays, each sub-array is values for a key field
140+
// All subarrays must have the same length
141+
UsingSingleLookupTable(Object[][] lookupData) {
108142
this.lookupData = lookupData;
109-
for (int i = 0; i < lookupData.length; i++) {
110-
matches.computeIfAbsent(lookupData[i], k -> new ArrayList<>()).add(i);
143+
int numRows = lookupData[0].length;
144+
for (int i = 0; i < numRows; i++) {
145+
List<Object> key = new ArrayList<>();
146+
for (Object[] col : lookupData) {
147+
key.add(col[i]);
148+
}
149+
matches.computeIfAbsent(key, k -> new ArrayList<>()).add(i);
111150
}
112151
}
113152

114153
@Override
115154
public void populate(int docCount, List<String> expected) {
116155
List<IndexRequestBuilder> docs = new ArrayList<>();
156+
int numFields = lookupData.length;
157+
int numRows = lookupData[0].length;
117158
for (int i = 0; i < docCount; i++) {
118-
Object key = lookupData[i % lookupData.length];
119-
docs.add(client().prepareIndex("source").setSource(Map.of("key", key)));
159+
List<Object> key = new ArrayList<>();
160+
Map<String, Object> sourceDoc = new HashMap<>();
161+
for (int f = 0; f < numFields; f++) {
162+
Object val = lookupData[f][i % numRows];
163+
key.add(val);
164+
sourceDoc.put("key" + f, val);
165+
}
166+
docs.add(client().prepareIndex("source").setSource(sourceDoc));
167+
String keyString;
168+
if (key.size() == 1) {
169+
keyString = String.valueOf(key.get(0));
170+
} else {
171+
keyString = String.join(",", key.stream().map(String::valueOf).toArray(String[]::new));
172+
}
120173
for (Integer match : matches.get(key)) {
121-
expected.add(key + ":" + match);
174+
expected.add(keyString + ":" + match);
122175
}
123176
}
124-
for (int i = 0; i < lookupData.length; i++) {
125-
docs.add(client().prepareIndex("lookup").setSource(Map.of("key", lookupData[i], "l", i)));
177+
for (int i = 0; i < numRows; i++) {
178+
Map<String, Object> lookupDoc = new HashMap<>();
179+
for (int f = 0; f < numFields; f++) {
180+
lookupDoc.put("key" + f, lookupData[f][i]);
181+
}
182+
lookupDoc.put("l", i);
183+
docs.add(client().prepareIndex("lookup").setSource(lookupDoc));
126184
}
127185
Collections.sort(expected);
128186
indexRandom(true, true, docs);
129187
}
130188
}
131189

132-
private void runLookup(DataType keyType, PopulateIndices populateIndices) throws IOException {
190+
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices) throws IOException {
191+
String[] fieldMappers = new String[keyTypes.size() * 2];
192+
for (int i = 0; i < keyTypes.size(); i++) {
193+
fieldMappers[2 * i] = "key" + i;
194+
fieldMappers[2 * i + 1] = "type=" + keyTypes.get(i).esType();
195+
}
133196
client().admin()
134197
.indices()
135198
.prepareCreate("source")
136199
.setSettings(Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1))
137-
.setMapping("key", "type=" + keyType.esType())
138-
.get();
139-
client().admin()
140-
.indices()
141-
.prepareCreate("lookup")
142-
.setSettings(
143-
Settings.builder()
144-
.put(IndexSettings.MODE.getKey(), "lookup")
145-
// TODO lookup index mode doesn't seem to force a single shard. That'll break the lookup command.
146-
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
147-
)
148-
.setMapping("key", "type=" + keyType.esType(), "l", "type=long")
200+
.setMapping(fieldMappers)
149201
.get();
202+
203+
Settings.Builder lookupSettings = Settings.builder()
204+
.put(IndexSettings.MODE.getKey(), "lookup")
205+
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1);
206+
{
207+
String[] lookupMappers = new String[keyTypes.size() * 2 + 2];
208+
int i = 0;
209+
for (; i < keyTypes.size(); i++) {
210+
lookupMappers[2 * i] = "key" + i;
211+
lookupMappers[2 * i + 1] = "type=" + keyTypes.get(i).esType();
212+
}
213+
lookupMappers[2 * i] = "l";
214+
lookupMappers[2 * i + 1] = "type=long";
215+
client().admin().indices().prepareCreate("lookup").setSettings(lookupSettings).setMapping(lookupMappers).get();
216+
}
150217
client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get();
151218

152219
int docCount = between(10, 1000);
@@ -198,15 +265,20 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws
198265
DocIdSetIterator.NO_MORE_DOCS,
199266
false // no scoring
200267
);
201-
ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory(
202-
PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY),
203-
List.of(
268+
List<ValuesSourceReaderOperator.FieldInfo> fieldInfos = new ArrayList<>();
269+
for (int i = 0; i < keyTypes.size(); i++) {
270+
final int idx = i;
271+
fieldInfos.add(
204272
new ValuesSourceReaderOperator.FieldInfo(
205-
"key",
206-
PlannerUtils.toElementType(keyType),
207-
shard -> searchContext.getSearchExecutionContext().getFieldType("key").blockLoader(blContext())
273+
"key" + idx,
274+
PlannerUtils.toElementType(keyTypes.get(idx)),
275+
shard -> searchContext.getSearchExecutionContext().getFieldType("key" + idx).blockLoader(blContext())
208276
)
209-
),
277+
);
278+
}
279+
ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory(
280+
PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY),
281+
fieldInfos,
210282
List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> {
211283
throw new IllegalStateException("can't load source here");
212284
}, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
@@ -225,7 +297,9 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws
225297
);
226298
final String finalNodeWithShard = nodeWithShard;
227299
List<LookupFromIndexOperator.MatchConfig> matchFields = new ArrayList<>();
228-
matchFields.add(new LookupFromIndexOperator.MatchConfig(new FieldAttribute.FieldName("key"), 1, keyType));
300+
for (int i = 0; i < keyTypes.size(); i++) {
301+
matchFields.add(new LookupFromIndexOperator.MatchConfig(new FieldAttribute.FieldName("key" + i), i + 1, keyTypes.get(i)));
302+
}
229303
LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory(
230304
matchFields,
231305
"test",
@@ -245,16 +319,28 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws
245319
List.of(reader.get(driverContext), lookup.get(driverContext)),
246320
new PageConsumerOperator(page -> {
247321
try {
248-
Block keyBlock = page.getBlock(1);
249-
LongVector loadedBlock = page.<LongBlock>getBlock(2).asVector();
322+
List<Block> keyBlocks = new ArrayList<>();
323+
for (int i = 0; i < keyTypes.size(); i++) {
324+
keyBlocks.add(page.getBlock(i + 1));
325+
}
326+
LongVector loadedBlock = page.<LongBlock>getBlock(keyTypes.size() + 1).asVector();
250327
for (int p = 0; p < page.getPositionCount(); p++) {
251-
List<Object> key = BlockTestUtils.valuesAtPositions(keyBlock, p, p + 1).get(0);
252-
assertThat(key, hasSize(1));
253-
Object keyValue = key.get(0);
254-
if (keyValue instanceof BytesRef b) {
255-
keyValue = b.utf8ToString();
328+
StringBuilder result = new StringBuilder();
329+
for (int j = 0; j < keyBlocks.size(); j++) {
330+
List<Object> key = BlockTestUtils.valuesAtPositions(keyBlocks.get(j), p, p + 1).get(0);
331+
assertThat(key, hasSize(1));
332+
Object keyValue = key.get(0);
333+
if (keyValue instanceof BytesRef b) {
334+
keyValue = b.utf8ToString();
335+
}
336+
result.append(keyValue);
337+
if (j < keyBlocks.size() - 1) {
338+
result.append(",");
339+
}
340+
256341
}
257-
results.add(keyValue + ":" + loadedBlock.getLong(p));
342+
result.append(":" + loadedBlock.getLong(p));
343+
results.add(result.toString());
258344
}
259345
} finally {
260346
page.releaseBlocks();

0 commit comments

Comments
 (0)