-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Simulate impact of shard movement using shard-level write load #131406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
nicktindall
merged 34 commits into
elastic:main
from
nicktindall:ES-12000_add_write_load_modeling_to_balancer
Jul 29, 2025
+310
−3
Merged
Changes from all commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
0ebd75d
Estimate impact of shard movement using node-level write load
nicktindall e589db4
Naming
nicktindall beb2611
More randomness
nicktindall 4e0fd1d
Merge remote-tracking branch 'origin/main' into ES-12000_add_write_lo…
nicktindall 3f90889
Pedantry
nicktindall 0b1d4a2
Naming
nicktindall 9527720
Merge remote-tracking branch 'origin/main' into ES-12000_add_write_lo…
nicktindall 9e36975
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall 9ca9b4b
Use shard write loads instead of estimating
nicktindall 988ac3c
Add javadoc to WriteLoadPerShardSimulator
nicktindall f5ed735
Explain simulateShardStarted better for the new shard case
nicktindall 8501b37
Assert on scale of utilisation change
nicktindall 8c21cc0
Improve description of relocation
nicktindall 519d1dd
Typo
nicktindall 58e84a2
Rename test to indicate it also tests missing write loads
nicktindall faccc3d
Always simulate based on original write loads and thread pool stats
nicktindall edc259a
Use for-loop instead of stream
nicktindall f60029f
Consolidate similar tests
nicktindall 94687e3
Naming/description of nodeUsageStatsForThreadPools
nicktindall 466a7e0
Naming of test utility methods
nicktindall 827f637
WriteLoadPerShardSimulator -> ShardMovementWriteLoadSimulator
nicktindall 1c876e7
Merge remote-tracking branch 'origin/main' into ES-12000_add_write_lo…
nicktindall a072aaf
Increase likelihood of write loads and utilizations being 0, floor ut…
nicktindall 6ae4aa4
Pedantry
nicktindall 1dbee7f
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall a4d89b9
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall 89c4d28
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall 770e04d
Assert that shardStarted only happens on destination node ina reloca…
nicktindall 2b3ebf6
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall 34472f6
Typo
nicktindall cf81780
Update server/src/main/java/org/elasticsearch/cluster/routing/ShardMo…
nicktindall 27a9ba0
Update server/src/main/java/org/elasticsearch/cluster/routing/ShardMo…
nicktindall 1e0a4a0
Naming
nicktindall 38c79a7
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
98 changes: 98 additions & 0 deletions
98
server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.cluster.routing; | ||
|
||
import com.carrotsearch.hppc.ObjectDoubleHashMap; | ||
import com.carrotsearch.hppc.ObjectDoubleMap; | ||
|
||
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; | ||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; | ||
import org.elasticsearch.common.util.Maps; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
|
||
/** | ||
* Simulates the impact to each node's write-load in response to the movement of individual | ||
* shards around the cluster. | ||
*/ | ||
public class ShardMovementWriteLoadSimulator { | ||
|
||
private final Map<String, NodeUsageStatsForThreadPools> originalNodeUsageStatsForThreadPools; | ||
private final ObjectDoubleMap<String> simulatedNodeWriteLoadDeltas; | ||
private final Map<ShardId, Double> writeLoadsPerShard; | ||
|
||
public ShardMovementWriteLoadSimulator(RoutingAllocation routingAllocation) { | ||
this.originalNodeUsageStatsForThreadPools = routingAllocation.clusterInfo().getNodeUsageStatsForThreadPools(); | ||
this.writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads(); | ||
this.simulatedNodeWriteLoadDeltas = new ObjectDoubleHashMap<>(); | ||
} | ||
|
||
public void simulateShardStarted(ShardRouting shardRouting) { | ||
final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId()); | ||
if (writeLoadForShard != null) { | ||
if (shardRouting.relocatingNodeId() != null) { | ||
assert shardRouting.state() == ShardRoutingState.INITIALIZING | ||
: "This should only be happening on the destination node (the source node will have status RELOCATING)"; | ||
// This is a shard being relocated | ||
simulatedNodeWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); | ||
simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); | ||
} else { | ||
// This is a new shard starting, it's unlikely we'll have a write-load value for a new | ||
// shard, but we may be able to estimate if the new shard is created as part of a datastream | ||
// rollover. See https://elasticco.atlassian.net/browse/ES-12469 | ||
simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Apply the simulated shard movements to the original thread pool usage stats for each node. | ||
*/ | ||
public Map<String, NodeUsageStatsForThreadPools> simulatedNodeUsageStatsForThreadPools() { | ||
final Map<String, NodeUsageStatsForThreadPools> adjustedNodeUsageStatsForThreadPools = Maps.newMapWithExpectedSize( | ||
originalNodeUsageStatsForThreadPools.size() | ||
); | ||
for (Map.Entry<String, NodeUsageStatsForThreadPools> entry : originalNodeUsageStatsForThreadPools.entrySet()) { | ||
if (simulatedNodeWriteLoadDeltas.containsKey(entry.getKey())) { | ||
var adjustedValue = new NodeUsageStatsForThreadPools( | ||
entry.getKey(), | ||
Maps.copyMapWithAddedOrReplacedEntry( | ||
entry.getValue().threadPoolUsageStatsMap(), | ||
ThreadPool.Names.WRITE, | ||
replaceWritePoolStats(entry.getValue(), simulatedNodeWriteLoadDeltas.get(entry.getKey())) | ||
) | ||
); | ||
adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue); | ||
} else { | ||
adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), entry.getValue()); | ||
} | ||
} | ||
return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools); | ||
} | ||
|
||
private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( | ||
NodeUsageStatsForThreadPools value, | ||
double writeLoadDelta | ||
) { | ||
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap() | ||
.get(ThreadPool.Names.WRITE); | ||
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( | ||
writeThreadPoolStats.totalThreadPoolThreads(), | ||
(float) Math.max( | ||
(writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())), | ||
0.0 | ||
), | ||
writeThreadPoolStats.averageThreadPoolQueueLatencyMillis() | ||
); | ||
} | ||
} |
207 changes: 207 additions & 0 deletions
207
...src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.cluster.routing; | ||
|
||
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; | ||
import org.elasticsearch.cluster.ClusterInfo; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; | ||
import org.elasticsearch.cluster.metadata.ProjectId; | ||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; | ||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.snapshots.SnapshotShardSizeInfo; | ||
import org.elasticsearch.test.ESTestCase; | ||
import org.hamcrest.Matchers; | ||
|
||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
import java.util.stream.StreamSupport; | ||
|
||
import static org.hamcrest.Matchers.closeTo; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.sameInstance; | ||
|
||
public class ShardMovementWriteLoadSimulatorTests extends ESTestCase { | ||
|
||
private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() { | ||
}; | ||
private static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" }; | ||
|
||
/** | ||
* We should not adjust the values if there's no movement | ||
*/ | ||
public void testNoShardMovement() { | ||
final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats(); | ||
final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats(); | ||
final var allocation = createRoutingAllocationWithRandomisedWriteLoads( | ||
originalNode0ThreadPoolStats, | ||
originalNode1ThreadPoolStats, | ||
Set.of() | ||
); | ||
|
||
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); | ||
final var calculatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); | ||
assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); | ||
assertThat( | ||
calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"), | ||
sameInstance(originalNode0ThreadPoolStats) | ||
); | ||
assertThat( | ||
calculatedNodeUsageStates.get("node_1").threadPoolUsageStatsMap().get("write"), | ||
sameInstance(originalNode1ThreadPoolStats) | ||
); | ||
} | ||
|
||
public void testMovementOfAShardWillMoveThreadPoolUtilisation() { | ||
final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats(); | ||
final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats(); | ||
final var allocation = createRoutingAllocationWithRandomisedWriteLoads( | ||
originalNode0ThreadPoolStats, | ||
originalNode1ThreadPoolStats, | ||
Set.of() | ||
); | ||
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); | ||
|
||
// Relocate a random shard from node_0 to node_1 | ||
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); | ||
final var expectedShardSize = randomNonNegativeLong(); | ||
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); | ||
shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2()); | ||
final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); | ||
|
||
final var calculatedNodeUsageStats = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); | ||
assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2)); | ||
|
||
final var shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); | ||
final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads(); | ||
final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.totalThreadPoolThreads(); | ||
|
||
// Some node_0 utilization should have been moved to node_1 | ||
if (expectedUtilisationReductionAtSource > originalNode0ThreadPoolStats.averageThreadPoolUtilization()) { | ||
// We don't return utilization less than zero because that makes no sense | ||
assertThat(getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), equalTo(0.0f)); | ||
} else { | ||
assertThat( | ||
(double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization( | ||
shardMovementWriteLoadSimulator, | ||
"node_0" | ||
), | ||
closeTo(expectedUtilisationReductionAtSource, 0.001f) | ||
); | ||
} | ||
assertThat( | ||
(double) getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1") - originalNode1ThreadPoolStats | ||
.averageThreadPoolUtilization(), | ||
closeTo(expectedUtilisationIncreaseAtDestination, 0.001f) | ||
); | ||
|
||
// Then move it back | ||
final var moveBackTuple = allocation.routingNodes() | ||
.relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP); | ||
shardMovementWriteLoadSimulator.simulateShardStarted(moveBackTuple.v2()); | ||
|
||
// The utilization numbers should return to their original values | ||
assertThat( | ||
getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), | ||
equalTo(originalNode0ThreadPoolStats.averageThreadPoolUtilization()) | ||
); | ||
assertThat( | ||
getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1"), | ||
equalTo(originalNode1ThreadPoolStats.averageThreadPoolUtilization()) | ||
); | ||
} | ||
|
||
public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { | ||
final var originalNode0ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null; | ||
final var originalNode1ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null; | ||
final var allocation = createRoutingAllocationWithRandomisedWriteLoads( | ||
originalNode0ThreadPoolStats, | ||
originalNode1ThreadPoolStats, | ||
new HashSet<>(randomSubsetOf(Arrays.asList(INDICES))) | ||
); | ||
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); | ||
|
||
// Relocate a random shard from node_0 to node_1 | ||
final var expectedShardSize = randomNonNegativeLong(); | ||
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); | ||
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); | ||
shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2()); | ||
allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); | ||
|
||
final var simulated = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); | ||
assertThat(simulated.containsKey("node_0"), equalTo(originalNode0ThreadPoolStats != null)); | ||
assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null)); | ||
} | ||
|
||
private float getAverageWritePoolUtilization(ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator, String nodeId) { | ||
final var generatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); | ||
final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write"); | ||
return node0WritePoolStats.averageThreadPoolUtilization(); | ||
} | ||
|
||
private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageStats() { | ||
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( | ||
randomIntBetween(4, 16), | ||
randomBoolean() ? 0.0f : randomFloatBetween(0.1f, 1.0f, true), | ||
randomLongBetween(0, 60_000) | ||
); | ||
} | ||
|
||
private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads( | ||
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0ThreadPoolStats, | ||
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1ThreadPoolStats, | ||
Set<String> indicesWithNoWriteLoad | ||
) { | ||
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>(); | ||
if (node0ThreadPoolStats != null) { | ||
nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0ThreadPoolStats))); | ||
} | ||
if (node1ThreadPoolStats != null) { | ||
nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1ThreadPoolStats))); | ||
} | ||
|
||
final ClusterState clusterState = createClusterState(); | ||
final ClusterInfo clusterInfo = ClusterInfo.builder() | ||
.nodeUsageStatsForThreadPools(nodeUsageStats) | ||
.shardWriteLoads( | ||
clusterState.metadata() | ||
.getProject(ProjectId.DEFAULT) | ||
.stream() | ||
.filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false) | ||
.flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum))) | ||
.collect( | ||
Collectors.toUnmodifiableMap( | ||
shardId -> shardId, | ||
shardId -> randomBoolean() ? 0.0f : randomDoubleBetween(0.1, 5.0, true) | ||
) | ||
) | ||
) | ||
.build(); | ||
|
||
return new RoutingAllocation( | ||
new AllocationDeciders(List.of()), | ||
clusterState, | ||
clusterInfo, | ||
SnapshotShardSizeInfo.EMPTY, | ||
System.nanoTime() | ||
).mutableCloneForSimulation(); | ||
} | ||
|
||
private ClusterState createClusterState() { | ||
return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(INDICES, 3, 0); | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a copy were made here, there would be no need to generate the node stats on demand. The original stats also aren't needed ever again in the original form.
The only reason to keep it I can think of would be stats reporting down the line -- e.g. what did this balancer round accomplish.
Anyway, more of a note than a request. Doesn't seem like a big deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept the originals + deltas so that we could just return the unmodified
NodeUsageStatsForThreadPools
for any nodes that weren't involved in shard movement. I think this should keep garbage to a minimum? Also if we keep adding and subtracting the write load values we might introduce rounding errors due to the arithmetic.