Skip to content

added doc key in the update_by_query method , so that we can this met… #131167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/131167.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
type: enhancement
area: "Ingest"
issues: []
release_note: >
Added `doc` field wrapper in `update_by_query` scripts to match the `update` API behavior.
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.reindex;

import org.apache.logging.log4j.Logger;
Expand All @@ -26,7 +17,6 @@
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.script.CtxMap;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
Expand All @@ -49,7 +39,6 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
private final ClusterService clusterService;
private final UpdateByQueryMetrics updateByQueryMetrics;

@Inject
public TransportUpdateByQueryAction(
ThreadPool threadPool,
ActionFilters actionFilters,
Expand All @@ -69,6 +58,25 @@ public TransportUpdateByQueryAction(

@Override
protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
// ✅ Convert doc to script if needed
if (request.getScript() == null && request.getDoc() != null) {
StringBuilder scriptBuilder = new StringBuilder();
for (Map.Entry<String, Object> entry : request.getDoc().entrySet()) {
scriptBuilder.append("ctx._source.")
.append(entry.getKey())
.append(" = params.")
.append(entry.getKey())
.append("; ");
}
Script generatedScript = new Script(
Script.DEFAULT_SCRIPT_TYPE,
Script.DEFAULT_SCRIPT_LANG,
scriptBuilder.toString().trim(),
request.getDoc()
);
request.setScript(generatedScript);
}

BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
long startTime = System.nanoTime();
BulkByScrollParallelizationHelper.startSlicedAction(
Expand Down Expand Up @@ -104,9 +112,6 @@ protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener
);
}

/**
* Simple implementation of update-by-query using scrolling and bulk.
*/
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<UpdateByQueryRequest, TransportUpdateByQueryAction> {

AsyncIndexBySearchAction(
Expand All @@ -121,7 +126,6 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Up
) {
super(
task,
// use sequence number powered optimistic concurrency control unless requested
request.getSearchRequest().source() != null && Boolean.TRUE.equals(request.getSearchRequest().source().version()),
true,
true,
Expand Down Expand Up @@ -191,7 +195,7 @@ protected CtxMap<UpdateByQueryMetadata> execute(ScrollableHitSource.Hit doc, Map

@Override
protected void updateRequest(RequestWrapper<?> request, UpdateByQueryMetadata metadata) {
// do nothing
// no-op
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,28 @@ public void testSlices() throws Exception {
assertEquals(2, client().prepareGet("test", "3").get().getVersion());
assertEquals(2, client().prepareGet("test", "4").get().getVersion());
}
public void testUpdateByQueryWithDocField() throws Exception {
String index = "test-doc-update";
createIndex(index);

// Index a sample document
client().prepareIndex(index).setId("1").setSource("counter", 1, "tag", "python").get();
refresh(index);

// Run update_by_query with doc field (instead of script)
UpdateByQueryRequestBuilder updateRequest = new UpdateByQueryRequestBuilder(client())
.source(index)
.setDoc(Map.of("counter", 2)) // <- using doc instead of script
.filter(QueryBuilders.termQuery("tag", "python"));

BulkByScrollResponse response = updateRequest.get();
assertEquals(1L, response.getUpdated()); // One document should be updated

// Fetch and verify
GetResponse updatedDoc = client().prepareGet(index, "1").get();
assertEquals(2, updatedDoc.getSource().get("counter"));
}


public void testMultipleSources() throws Exception {
int sourceIndices = between(2, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Map;

/**
* Request to update some documents. That means you can't change their type, id, index, or anything like that. This implements
Expand All @@ -31,11 +33,17 @@ public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<Updat
implements
IndicesRequest.Replaceable,
ToXContentObject {

/**
* Ingest pipeline to set on index requests made by this action.
*/
private String pipeline;

/**
* Optional doc field to allow simplified partial updates.
*/
private Map<String, Object> doc;

public UpdateByQueryRequest() {
this(new SearchRequest());
}
Expand All @@ -51,6 +59,7 @@ public UpdateByQueryRequest(String... indices) {
public UpdateByQueryRequest(StreamInput in) throws IOException {
super(in);
pipeline = in.readOptionalString();
doc = in.readMap(StreamInput::readString, StreamInput::readGenericValue); // Deserialize doc
}

UpdateByQueryRequest(SearchRequest search, boolean setDefaults) {
Expand All @@ -65,6 +74,21 @@ public UpdateByQueryRequest setPipeline(String pipeline) {
return this;
}

/**
* Optional doc to be applied to matched documents.
*/
public UpdateByQueryRequest setDoc(Map<String, Object> doc) {
this.doc = doc;
return this;
}

/**
* Get the doc used for partial update, if present.
*/
public Map<String, Object> getDoc() {
return doc;
}

/**
* Set the query for selective update
*/
Expand Down Expand Up @@ -136,6 +160,7 @@ public boolean includeDataStreams() {
public UpdateByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice, int totalSlices) {
UpdateByQueryRequest request = doForSlice(new UpdateByQueryRequest(slice, false), slicingTask, totalSlices);
request.setPipeline(pipeline);
request.setDoc(doc); // Ensure doc is copied to sliced request
return request;
}

Expand Down Expand Up @@ -172,6 +197,7 @@ public IndicesOptions indicesOptions() {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(pipeline);
out.writeMap(doc, StreamOutput::writeString, StreamOutput::writeGenericValue); // Serialize doc
}

@Override
Expand All @@ -181,6 +207,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("script");
getScript().toXContent(builder, params);
}
if (doc != null) {
builder.field("doc", doc); // Include doc in output
}
getSearchRequest().source().innerToXContent(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.internal.ElasticsearchClient;

import java.util.Map;

public class UpdateByQueryRequestBuilder extends AbstractBulkIndexByScrollRequestBuilder<
UpdateByQueryRequest,
UpdateByQueryRequestBuilder> {

private Boolean abortOnVersionConflict;
private String pipeline;

private Map<String, Object> doc; // Added

public UpdateByQueryRequestBuilder(ElasticsearchClient client) {
this(client, new SearchRequestBuilder(client));
}
Expand All @@ -44,6 +48,12 @@ public UpdateByQueryRequestBuilder setPipeline(String pipeline) {
return this;
}

// NEW: Add setter for doc
public UpdateByQueryRequestBuilder setDoc(Map<String, Object> doc) {
this.doc = doc;
return this;
}

@Override
public UpdateByQueryRequest request() {
SearchRequest search = source().request();
Expand Down Expand Up @@ -71,5 +81,8 @@ public void apply(UpdateByQueryRequest request) {
if (pipeline != null) {
request.setPipeline(pipeline);
}
if (doc != null) { // Apply doc field
request.setDoc(doc);
}
}
}
Loading