Skip to content

Commit ab2e654

Browse files
Simulate impact of shard movement using shard-level write load (#131406)
Co-authored-by: Dianna Hohensee <[email protected]>
1 parent 6fe2e86 commit ab2e654

File tree

3 files changed

+310
-3
lines changed

3 files changed

+310
-3
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.cluster;
1111

1212
import org.elasticsearch.cluster.ClusterInfo.NodeAndShard;
13+
import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator;
1314
import org.elasticsearch.cluster.routing.ShardRouting;
1415
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
1516
import org.elasticsearch.common.util.CopyOnFirstWriteMap;
@@ -34,7 +35,7 @@ public class ClusterInfoSimulator {
3435
private final Map<ShardId, Long> shardDataSetSizes;
3536
private final Map<NodeAndShard, String> dataPath;
3637
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
37-
private final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats;
38+
private final ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator;
3839

3940
public ClusterInfoSimulator(RoutingAllocation allocation) {
4041
this.allocation = allocation;
@@ -44,7 +45,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
4445
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
4546
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
4647
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
47-
this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
48+
this.shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
4849
}
4950

5051
/**
@@ -115,6 +116,7 @@ public void simulateShardStarted(ShardRouting shard) {
115116
shardSizes.put(shardIdentifierFromRouting(shard), project.getIndexSafe(shard.index()).ignoreDiskWatermarks() ? 0 : size);
116117
}
117118
}
119+
shardMovementWriteLoadSimulator.simulateShardStarted(shard);
118120
}
119121

120122
private void modifyDiskUsage(String nodeId, long freeDelta) {
@@ -159,7 +161,7 @@ public ClusterInfo getClusterInfo() {
159161
dataPath,
160162
Map.of(),
161163
estimatedHeapUsages,
162-
nodeThreadPoolUsageStats,
164+
shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(),
163165
allocation.clusterInfo().getShardWriteLoads()
164166
);
165167
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing;
11+
12+
import com.carrotsearch.hppc.ObjectDoubleHashMap;
13+
import com.carrotsearch.hppc.ObjectDoubleMap;
14+
15+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
16+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
17+
import org.elasticsearch.common.util.Maps;
18+
import org.elasticsearch.index.shard.ShardId;
19+
import org.elasticsearch.threadpool.ThreadPool;
20+
21+
import java.util.Collections;
22+
import java.util.Map;
23+
24+
/**
25+
* Simulates the impact to each node's write-load in response to the movement of individual
26+
* shards around the cluster.
27+
*/
28+
public class ShardMovementWriteLoadSimulator {
29+
30+
private final Map<String, NodeUsageStatsForThreadPools> originalNodeUsageStatsForThreadPools;
31+
private final ObjectDoubleMap<String> simulatedNodeWriteLoadDeltas;
32+
private final Map<ShardId, Double> writeLoadsPerShard;
33+
34+
public ShardMovementWriteLoadSimulator(RoutingAllocation routingAllocation) {
35+
this.originalNodeUsageStatsForThreadPools = routingAllocation.clusterInfo().getNodeUsageStatsForThreadPools();
36+
this.writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads();
37+
this.simulatedNodeWriteLoadDeltas = new ObjectDoubleHashMap<>();
38+
}
39+
40+
public void simulateShardStarted(ShardRouting shardRouting) {
41+
final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId());
42+
if (writeLoadForShard != null) {
43+
if (shardRouting.relocatingNodeId() != null) {
44+
assert shardRouting.state() == ShardRoutingState.INITIALIZING
45+
: "This should only be happening on the destination node (the source node will have status RELOCATING)";
46+
// This is a shard being relocated
47+
simulatedNodeWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard);
48+
simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
49+
} else {
50+
// This is a new shard starting, it's unlikely we'll have a write-load value for a new
51+
// shard, but we may be able to estimate if the new shard is created as part of a datastream
52+
// rollover. See https://elasticco.atlassian.net/browse/ES-12469
53+
simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
54+
}
55+
}
56+
}
57+
58+
/**
59+
* Apply the simulated shard movements to the original thread pool usage stats for each node.
60+
*/
61+
public Map<String, NodeUsageStatsForThreadPools> simulatedNodeUsageStatsForThreadPools() {
62+
final Map<String, NodeUsageStatsForThreadPools> adjustedNodeUsageStatsForThreadPools = Maps.newMapWithExpectedSize(
63+
originalNodeUsageStatsForThreadPools.size()
64+
);
65+
for (Map.Entry<String, NodeUsageStatsForThreadPools> entry : originalNodeUsageStatsForThreadPools.entrySet()) {
66+
if (simulatedNodeWriteLoadDeltas.containsKey(entry.getKey())) {
67+
var adjustedValue = new NodeUsageStatsForThreadPools(
68+
entry.getKey(),
69+
Maps.copyMapWithAddedOrReplacedEntry(
70+
entry.getValue().threadPoolUsageStatsMap(),
71+
ThreadPool.Names.WRITE,
72+
replaceWritePoolStats(entry.getValue(), simulatedNodeWriteLoadDeltas.get(entry.getKey()))
73+
)
74+
);
75+
adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue);
76+
} else {
77+
adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), entry.getValue());
78+
}
79+
}
80+
return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools);
81+
}
82+
83+
private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats(
84+
NodeUsageStatsForThreadPools value,
85+
double writeLoadDelta
86+
) {
87+
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
88+
.get(ThreadPool.Names.WRITE);
89+
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
90+
writeThreadPoolStats.totalThreadPoolThreads(),
91+
(float) Math.max(
92+
(writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())),
93+
0.0
94+
),
95+
writeThreadPoolStats.averageThreadPoolQueueLatencyMillis()
96+
);
97+
}
98+
}
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing;
11+
12+
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
13+
import org.elasticsearch.cluster.ClusterInfo;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
16+
import org.elasticsearch.cluster.metadata.ProjectId;
17+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
18+
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
19+
import org.elasticsearch.index.shard.ShardId;
20+
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
21+
import org.elasticsearch.test.ESTestCase;
22+
import org.hamcrest.Matchers;
23+
24+
import java.util.Arrays;
25+
import java.util.HashMap;
26+
import java.util.HashSet;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Set;
30+
import java.util.stream.Collectors;
31+
import java.util.stream.IntStream;
32+
import java.util.stream.StreamSupport;
33+
34+
import static org.hamcrest.Matchers.closeTo;
35+
import static org.hamcrest.Matchers.equalTo;
36+
import static org.hamcrest.Matchers.sameInstance;
37+
38+
public class ShardMovementWriteLoadSimulatorTests extends ESTestCase {
39+
40+
private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() {
41+
};
42+
private static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" };
43+
44+
/**
45+
* We should not adjust the values if there's no movement
46+
*/
47+
public void testNoShardMovement() {
48+
final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats();
49+
final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats();
50+
final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
51+
originalNode0ThreadPoolStats,
52+
originalNode1ThreadPoolStats,
53+
Set.of()
54+
);
55+
56+
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
57+
final var calculatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
58+
assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2));
59+
assertThat(
60+
calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"),
61+
sameInstance(originalNode0ThreadPoolStats)
62+
);
63+
assertThat(
64+
calculatedNodeUsageStates.get("node_1").threadPoolUsageStatsMap().get("write"),
65+
sameInstance(originalNode1ThreadPoolStats)
66+
);
67+
}
68+
69+
public void testMovementOfAShardWillMoveThreadPoolUtilisation() {
70+
final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats();
71+
final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats();
72+
final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
73+
originalNode0ThreadPoolStats,
74+
originalNode1ThreadPoolStats,
75+
Set.of()
76+
);
77+
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
78+
79+
// Relocate a random shard from node_0 to node_1
80+
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
81+
final var expectedShardSize = randomNonNegativeLong();
82+
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
83+
shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2());
84+
final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);
85+
86+
final var calculatedNodeUsageStats = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
87+
assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2));
88+
89+
final var shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId());
90+
final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads();
91+
final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.totalThreadPoolThreads();
92+
93+
// Some node_0 utilization should have been moved to node_1
94+
if (expectedUtilisationReductionAtSource > originalNode0ThreadPoolStats.averageThreadPoolUtilization()) {
95+
// We don't return utilization less than zero because that makes no sense
96+
assertThat(getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), equalTo(0.0f));
97+
} else {
98+
assertThat(
99+
(double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization(
100+
shardMovementWriteLoadSimulator,
101+
"node_0"
102+
),
103+
closeTo(expectedUtilisationReductionAtSource, 0.001f)
104+
);
105+
}
106+
assertThat(
107+
(double) getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1") - originalNode1ThreadPoolStats
108+
.averageThreadPoolUtilization(),
109+
closeTo(expectedUtilisationIncreaseAtDestination, 0.001f)
110+
);
111+
112+
// Then move it back
113+
final var moveBackTuple = allocation.routingNodes()
114+
.relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP);
115+
shardMovementWriteLoadSimulator.simulateShardStarted(moveBackTuple.v2());
116+
117+
// The utilization numbers should return to their original values
118+
assertThat(
119+
getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"),
120+
equalTo(originalNode0ThreadPoolStats.averageThreadPoolUtilization())
121+
);
122+
assertThat(
123+
getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1"),
124+
equalTo(originalNode1ThreadPoolStats.averageThreadPoolUtilization())
125+
);
126+
}
127+
128+
public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() {
129+
final var originalNode0ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null;
130+
final var originalNode1ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null;
131+
final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
132+
originalNode0ThreadPoolStats,
133+
originalNode1ThreadPoolStats,
134+
new HashSet<>(randomSubsetOf(Arrays.asList(INDICES)))
135+
);
136+
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
137+
138+
// Relocate a random shard from node_0 to node_1
139+
final var expectedShardSize = randomNonNegativeLong();
140+
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
141+
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
142+
shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2());
143+
allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);
144+
145+
final var simulated = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
146+
assertThat(simulated.containsKey("node_0"), equalTo(originalNode0ThreadPoolStats != null));
147+
assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null));
148+
}
149+
150+
private float getAverageWritePoolUtilization(ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator, String nodeId) {
151+
final var generatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
152+
final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write");
153+
return node0WritePoolStats.averageThreadPoolUtilization();
154+
}
155+
156+
private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageStats() {
157+
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
158+
randomIntBetween(4, 16),
159+
randomBoolean() ? 0.0f : randomFloatBetween(0.1f, 1.0f, true),
160+
randomLongBetween(0, 60_000)
161+
);
162+
}
163+
164+
private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads(
165+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0ThreadPoolStats,
166+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1ThreadPoolStats,
167+
Set<String> indicesWithNoWriteLoad
168+
) {
169+
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>();
170+
if (node0ThreadPoolStats != null) {
171+
nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0ThreadPoolStats)));
172+
}
173+
if (node1ThreadPoolStats != null) {
174+
nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1ThreadPoolStats)));
175+
}
176+
177+
final ClusterState clusterState = createClusterState();
178+
final ClusterInfo clusterInfo = ClusterInfo.builder()
179+
.nodeUsageStatsForThreadPools(nodeUsageStats)
180+
.shardWriteLoads(
181+
clusterState.metadata()
182+
.getProject(ProjectId.DEFAULT)
183+
.stream()
184+
.filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false)
185+
.flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum)))
186+
.collect(
187+
Collectors.toUnmodifiableMap(
188+
shardId -> shardId,
189+
shardId -> randomBoolean() ? 0.0f : randomDoubleBetween(0.1, 5.0, true)
190+
)
191+
)
192+
)
193+
.build();
194+
195+
return new RoutingAllocation(
196+
new AllocationDeciders(List.of()),
197+
clusterState,
198+
clusterInfo,
199+
SnapshotShardSizeInfo.EMPTY,
200+
System.nanoTime()
201+
).mutableCloneForSimulation();
202+
}
203+
204+
private ClusterState createClusterState() {
205+
return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(INDICES, 3, 0);
206+
}
207+
}

0 commit comments

Comments
 (0)