From 0ebd75dcf05d4da2f2bf8f0575834f8269438393 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 17 Jul 2025 15:52:51 +1000 Subject: [PATCH 01/25] Estimate impact of shard movement using node-level write load --- .../cluster/ClusterInfoSimulator.java | 8 +- .../routing/WriteLoadPerShardSimulator.java | 151 ++++++++++++++ .../WriteLoadPerShardSimulatorTests.java | 191 ++++++++++++++++++ 3 files changed, 347 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index 7e995404191d6..6785924ec9c59 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.ClusterInfo.NodeAndShard; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.WriteLoadPerShardSimulator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.util.CopyOnFirstWriteMap; import org.elasticsearch.index.shard.ShardId; @@ -34,7 +35,7 @@ public class ClusterInfoSimulator { private final Map shardDataSetSizes; private final Map dataPath; private final Map estimatedHeapUsages; - private final Map nodeThreadPoolUsageStats; + private final WriteLoadPerShardSimulator writeLoadPerShardSimulator; public ClusterInfoSimulator(RoutingAllocation allocation) { this.allocation = allocation; @@ -44,7 +45,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) { this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes); this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath); this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages(); - this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); + this.writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); } /** @@ -115,6 +116,7 @@ public void simulateShardStarted(ShardRouting shard) { shardSizes.put(shardIdentifierFromRouting(shard), project.getIndexSafe(shard.index()).ignoreDiskWatermarks() ? 0 : size); } } + writeLoadPerShardSimulator.simulateShardStarted(shard); } private void modifyDiskUsage(String nodeId, long freeDelta) { @@ -159,7 +161,7 @@ public ClusterInfo getClusterInfo() { dataPath, Map.of(), estimatedHeapUsages, - nodeThreadPoolUsageStats + writeLoadPerShardSimulator.nodeUsageStatsForThreadPools() ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java new file mode 100644 index 0000000000000..e2817c8530c93 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -0,0 +1,151 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing; + +import com.carrotsearch.hppc.ObjectFloatHashMap; +import com.carrotsearch.hppc.ObjectFloatMap; + +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class WriteLoadPerShardSimulator { + + private final ObjectFloatMap writeLoadDeltas; + private final RoutingAllocation routingAllocation; + private final ObjectFloatMap writeLoadsPerShard; + + public WriteLoadPerShardSimulator(RoutingAllocation routingAllocation) { + this.routingAllocation = routingAllocation; + this.writeLoadDeltas = new ObjectFloatHashMap<>(); + writeLoadsPerShard = estimateWriteLoadsPerShard(routingAllocation); + } + + public void simulateShardStarted(ShardRouting shardRouting) { + float writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId()); + if (writeLoadForShard > 0.0) { + if (shardRouting.relocatingNodeId() != null) { + // relocating + writeLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); + writeLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); + } else { + // not sure how this would come about, perhaps when allocating a replica after a delay? + writeLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); + } + } + } + + public Map nodeUsageStatsForThreadPools() { + return routingAllocation.clusterInfo() + .getNodeUsageStatsForThreadPools() + .entrySet() + .stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> { + if (writeLoadDeltas.containsKey(e.getKey())) { + return new NodeUsageStatsForThreadPools( + e.getKey(), + Maps.copyMapWithAddedOrReplacedEntry( + e.getValue().threadPoolUsageStatsMap(), + "write", + replaceWritePoolStats(e.getValue(), writeLoadDeltas.get(e.getKey())) + ) + ); + } + return e.getValue(); + })); + } + + private NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( + NodeUsageStatsForThreadPools value, + float writeLoadDelta + ) { + final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE); + return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + writeThreadPoolStats.totalThreadPoolThreads(), + writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads()), + writeThreadPoolStats.averageThreadPoolQueueLatencyMillis() + ); + } + + // Everything below this line can probably go once we are publishing shard-write-load estimates to the master + + private static ObjectFloatMap estimateWriteLoadsPerShard(RoutingAllocation allocation) { + final Map writeLoadPerShard = new HashMap<>(); + final Set writeIndexNames = getWriteIndexNames(allocation); + final Map nodeUsageStatsForThreadPools = allocation.clusterInfo() + .getNodeUsageStatsForThreadPools(); + for (final Map.Entry usageStatsForThreadPoolsEntry : nodeUsageStatsForThreadPools + .entrySet()) { + final NodeUsageStatsForThreadPools value = usageStatsForThreadPoolsEntry.getValue(); + final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE); + if (writeThreadPoolStats == null) { + // No stats from this node yet + continue; + } + float writeUtilisation = writeThreadPoolStats.averageThreadPoolUtilization() * writeThreadPoolStats.totalThreadPoolThreads(); + + final String nodeId = usageStatsForThreadPoolsEntry.getKey(); + final RoutingNode node = allocation.routingNodes().node(nodeId); + final Set writeShardsOnNode = new HashSet<>(); + for (final ShardRouting shardRouting : node) { + if (shardRouting.role() != ShardRouting.Role.SEARCH_ONLY && writeIndexNames.contains(shardRouting.index().getName())) { + writeShardsOnNode.add(shardRouting.shardId()); + } + } + writeShardsOnNode.forEach( + shardId -> writeLoadPerShard.computeIfAbsent(shardId, k -> new Average()).add(writeUtilisation / writeShardsOnNode.size()) + ); + } + final ObjectFloatMap writeLoads = new ObjectFloatHashMap<>(writeLoadPerShard.size()); + writeLoadPerShard.forEach((shardId, average) -> writeLoads.put(shardId, average.get())); + return writeLoads; + } + + private static Set getWriteIndexNames(RoutingAllocation allocation) { + return allocation.metadata() + .projects() + .values() + .stream() + .map(ProjectMetadata::getIndicesLookup) + .flatMap(il -> il.values().stream()) + .map(IndexAbstraction::getWriteIndex) + .filter(Objects::nonNull) + .map(Index::getName) + .collect(Collectors.toUnmodifiableSet()); + } + + private static final class Average { + int count; + float sum; + + public void add(float value) { + count++; + sum += value; + } + + public float get() { + return sum / count; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java new file mode 100644 index 0000000000000..09a5cf975f7cd --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java @@ -0,0 +1,191 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.snapshots.SnapshotShardSizeInfo; +import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.sameInstance; + +public class WriteLoadPerShardSimulatorTests extends ESTestCase { + + private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() { + }; + + /** + * We should not adjust the values if there's no movement + */ + public void testNoShardMovement() { + final var originalNode0WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + randomIntBetween(4, 32), + randomFloatBetween(0f, 1f, true), + randomLongBetween(0L, 7_000L) + ); + final var originalNode1WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + randomIntBetween(4, 32), + randomFloatBetween(0f, 1f, true), + randomLongBetween(0L, 7_000L) + ); + final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats); + + final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); + final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); + assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); + assertThat( + calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"), + sameInstance(originalNode0WriteLoadStats) + ); + assertThat( + calculatedNodeUsageStates.get("node_1").threadPoolUsageStatsMap().get("write"), + sameInstance(originalNode1WriteLoadStats) + ); + } + + public void testMovementOfAShardWillReduceThreadPoolUtilisation() { + final var originalNode0WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + randomIntBetween(4, 16), + randomFloatBetween(0.2f, 1.0f, true), + 0 + ); + final var originalNode1WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + randomIntBetween(4, 16), + randomFloatBetween(0.1f, 0.5f, true), + 0 + ); + final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats); + + final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); + + // Relocate a random shard from node_0 to node_1 + final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); + final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", randomNonNegativeLong(), "testing", NOOP); + writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); + + final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); + assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); + + // Some node_0 utilization should have been moved to node_1 + assertThat( + getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), + lessThan(originalNode0WriteLoadStats.averageThreadPoolUtilization()) + ); + assertThat( + getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"), + greaterThan(originalNode1WriteLoadStats.averageThreadPoolUtilization()) + ); + } + + public void testMovementFollowedByMovementBackWillNotChangeAnything() { + final var originalNode0WriteLoadStats = randomUsageStats(); + final var originalNode1WriteLoadStats = randomUsageStats(); + final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats); + final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); + + // Relocate a random shard from node_0 to node_1 + final long expectedShardSize = randomNonNegativeLong(); + final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); + final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); + writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); + final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); + + // Some node_0 utilization should have been moved to node_1 + assertThat( + getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), + lessThan(originalNode0WriteLoadStats.averageThreadPoolUtilization()) + ); + assertThat( + getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"), + greaterThan(originalNode1WriteLoadStats.averageThreadPoolUtilization()) + ); + + // Then move it back + final var moveBackTuple = allocation.routingNodes() + .relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP); + writeLoadPerShardSimulator.simulateShardStarted(moveBackTuple.v2()); + + // The utilization numbers should be back to their original values + assertThat( + getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), + equalTo(originalNode0WriteLoadStats.averageThreadPoolUtilization()) + ); + assertThat( + getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"), + equalTo(originalNode1WriteLoadStats.averageThreadPoolUtilization()) + ); + } + + public void testMovementBetweenNodesWithNoThreadPoolStats() { + final var originalNode0WriteLoadStats = randomBoolean() ? randomUsageStats() : null; + final var originalNode1WriteLoadStats = randomBoolean() ? randomUsageStats() : null; + final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats); + final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); + + // Relocate a random shard from node_0 to node_1 + final long expectedShardSize = randomNonNegativeLong(); + final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); + final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); + writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); + allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); + + final var generated = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); + assertThat(generated.containsKey("node_0"), equalTo(originalNode0WriteLoadStats != null)); + assertThat(generated.containsKey("node_1"), equalTo(originalNode1WriteLoadStats != null)); + } + + private float getAverageWritePoolUtilization(WriteLoadPerShardSimulator writeLoadPerShardSimulator, String nodeId) { + final var generatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); + final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write"); + return node0WritePoolStats.averageThreadPoolUtilization(); + } + + private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomUsageStats() { + return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(randomIntBetween(4, 16), randomFloatBetween(0.0f, 1.0f, false), 0); + } + + private RoutingAllocation createRoutingAllocation( + NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0WriteLoadStats, + NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1WriteLoadStats + ) { + final Map nodeUsageStats = new HashMap<>(); + if (node0WriteLoadStats != null) { + nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0WriteLoadStats))); + } + if (node1WriteLoadStats != null) { + nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1WriteLoadStats))); + } + + return new RoutingAllocation( + new AllocationDeciders(List.of()), + createClusterState(), + ClusterInfo.builder().nodeUsageStatsForThreadPools(nodeUsageStats).build(), + SnapshotShardSizeInfo.EMPTY, + System.nanoTime() + ).mutableCloneForSimulation(); + } + + private ClusterState createClusterState() { + return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { "indexOne", "indexTwo", "indexThree" }, 3, 0); + } +} From e589db4c0bb71bf2a057f978572de06162e7e507 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 17 Jul 2025 16:02:36 +1000 Subject: [PATCH 02/25] Naming --- .../cluster/routing/WriteLoadPerShardSimulator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java index e2817c8530c93..c602d7908434e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -128,7 +128,7 @@ private static Set getWriteIndexNames(RoutingAllocation allocation) { .values() .stream() .map(ProjectMetadata::getIndicesLookup) - .flatMap(il -> il.values().stream()) + .flatMap(indicesLookup -> indicesLookup.values().stream()) .map(IndexAbstraction::getWriteIndex) .filter(Objects::nonNull) .map(Index::getName) From beb26118e079c3a1bf068e3bd1b98cfe0c2c4f45 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 17 Jul 2025 16:07:04 +1000 Subject: [PATCH 03/25] More randomness --- .../WriteLoadPerShardSimulatorTests.java | 31 ++++++------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java index 09a5cf975f7cd..0b9a12c3d347a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java @@ -38,16 +38,8 @@ public class WriteLoadPerShardSimulatorTests extends ESTestCase { * We should not adjust the values if there's no movement */ public void testNoShardMovement() { - final var originalNode0WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( - randomIntBetween(4, 32), - randomFloatBetween(0f, 1f, true), - randomLongBetween(0L, 7_000L) - ); - final var originalNode1WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( - randomIntBetween(4, 32), - randomFloatBetween(0f, 1f, true), - randomLongBetween(0L, 7_000L) - ); + final var originalNode0WriteLoadStats = randomUsageStats(); + final var originalNode1WriteLoadStats = randomUsageStats(); final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats); final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); @@ -64,18 +56,9 @@ public void testNoShardMovement() { } public void testMovementOfAShardWillReduceThreadPoolUtilisation() { - final var originalNode0WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( - randomIntBetween(4, 16), - randomFloatBetween(0.2f, 1.0f, true), - 0 - ); - final var originalNode1WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( - randomIntBetween(4, 16), - randomFloatBetween(0.1f, 0.5f, true), - 0 - ); + final var originalNode0WriteLoadStats = randomUsageStats(); + final var originalNode1WriteLoadStats = randomUsageStats(); final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats); - final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); // Relocate a random shard from node_0 to node_1 @@ -161,7 +144,11 @@ private float getAverageWritePoolUtilization(WriteLoadPerShardSimulator writeLoa } private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomUsageStats() { - return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(randomIntBetween(4, 16), randomFloatBetween(0.0f, 1.0f, false), 0); + return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + randomIntBetween(4, 16), + randomFloatBetween(0.1f, 1.0f, true), + randomLongBetween(0, 60_000) + ); } private RoutingAllocation createRoutingAllocation( From 3f90889e3c485e207cd744760438e3221bb7d378 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 17 Jul 2025 16:39:14 +1000 Subject: [PATCH 04/25] Pedantry --- .../cluster/routing/WriteLoadPerShardSimulator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java index c602d7908434e..69909b5af55dc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -41,7 +41,7 @@ public WriteLoadPerShardSimulator(RoutingAllocation routingAllocation) { } public void simulateShardStarted(ShardRouting shardRouting) { - float writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId()); + final float writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId()); if (writeLoadForShard > 0.0) { if (shardRouting.relocatingNodeId() != null) { // relocating From 0b1d4a27a5edeed1f37d56226205e454624d8778 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 18 Jul 2025 09:27:42 +1000 Subject: [PATCH 05/25] Naming --- .../routing/WriteLoadPerShardSimulator.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java index 69909b5af55dc..d956ea18ce4f8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -30,13 +30,13 @@ public class WriteLoadPerShardSimulator { - private final ObjectFloatMap writeLoadDeltas; + private final ObjectFloatMap simulatedWriteLoadDeltas; private final RoutingAllocation routingAllocation; private final ObjectFloatMap writeLoadsPerShard; public WriteLoadPerShardSimulator(RoutingAllocation routingAllocation) { this.routingAllocation = routingAllocation; - this.writeLoadDeltas = new ObjectFloatHashMap<>(); + this.simulatedWriteLoadDeltas = new ObjectFloatHashMap<>(); writeLoadsPerShard = estimateWriteLoadsPerShard(routingAllocation); } @@ -45,11 +45,11 @@ public void simulateShardStarted(ShardRouting shardRouting) { if (writeLoadForShard > 0.0) { if (shardRouting.relocatingNodeId() != null) { // relocating - writeLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); - writeLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); + simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); + simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); } else { // not sure how this would come about, perhaps when allocating a replica after a delay? - writeLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); + simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); } } } @@ -60,13 +60,13 @@ public Map nodeUsageStatsForThreadPools() .entrySet() .stream() .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> { - if (writeLoadDeltas.containsKey(e.getKey())) { + if (simulatedWriteLoadDeltas.containsKey(e.getKey())) { return new NodeUsageStatsForThreadPools( e.getKey(), Maps.copyMapWithAddedOrReplacedEntry( e.getValue().threadPoolUsageStatsMap(), "write", - replaceWritePoolStats(e.getValue(), writeLoadDeltas.get(e.getKey())) + replaceWritePoolStats(e.getValue(), simulatedWriteLoadDeltas.get(e.getKey())) ) ); } From 9ca9b4bfac8c4f2c58a20db90dd0f9bdff2eba14 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 21 Jul 2025 16:44:10 +1000 Subject: [PATCH 06/25] Use shard write loads instead of estimating --- .../routing/WriteLoadPerShardSimulator.java | 90 +++---------------- .../WriteLoadPerShardSimulatorTests.java | 42 +++++++-- 2 files changed, 45 insertions(+), 87 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java index d956ea18ce4f8..3da30eade4868 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -9,40 +9,33 @@ package org.elasticsearch.cluster.routing; -import com.carrotsearch.hppc.ObjectFloatHashMap; -import com.carrotsearch.hppc.ObjectFloatMap; +import com.carrotsearch.hppc.ObjectDoubleHashMap; +import com.carrotsearch.hppc.ObjectDoubleMap; import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; -import org.elasticsearch.cluster.metadata.IndexAbstraction; -import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.util.Maps; -import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; public class WriteLoadPerShardSimulator { - private final ObjectFloatMap simulatedWriteLoadDeltas; + private final ObjectDoubleMap simulatedWriteLoadDeltas; private final RoutingAllocation routingAllocation; - private final ObjectFloatMap writeLoadsPerShard; + private final Map writeLoadsPerShard; public WriteLoadPerShardSimulator(RoutingAllocation routingAllocation) { this.routingAllocation = routingAllocation; - this.simulatedWriteLoadDeltas = new ObjectFloatHashMap<>(); - writeLoadsPerShard = estimateWriteLoadsPerShard(routingAllocation); + this.simulatedWriteLoadDeltas = new ObjectDoubleHashMap<>(); + writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads(); } public void simulateShardStarted(ShardRouting shardRouting) { - final float writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId()); - if (writeLoadForShard > 0.0) { + final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId()); + if (writeLoadForShard != null) { if (shardRouting.relocatingNodeId() != null) { // relocating simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); @@ -76,76 +69,15 @@ public Map nodeUsageStatsForThreadPools() private NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( NodeUsageStatsForThreadPools value, - float writeLoadDelta + double writeLoadDelta ) { final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap() .get(ThreadPool.Names.WRITE); return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( writeThreadPoolStats.totalThreadPoolThreads(), - writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads()), + (float) (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats + .totalThreadPoolThreads())), writeThreadPoolStats.averageThreadPoolQueueLatencyMillis() ); } - - // Everything below this line can probably go once we are publishing shard-write-load estimates to the master - - private static ObjectFloatMap estimateWriteLoadsPerShard(RoutingAllocation allocation) { - final Map writeLoadPerShard = new HashMap<>(); - final Set writeIndexNames = getWriteIndexNames(allocation); - final Map nodeUsageStatsForThreadPools = allocation.clusterInfo() - .getNodeUsageStatsForThreadPools(); - for (final Map.Entry usageStatsForThreadPoolsEntry : nodeUsageStatsForThreadPools - .entrySet()) { - final NodeUsageStatsForThreadPools value = usageStatsForThreadPoolsEntry.getValue(); - final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap() - .get(ThreadPool.Names.WRITE); - if (writeThreadPoolStats == null) { - // No stats from this node yet - continue; - } - float writeUtilisation = writeThreadPoolStats.averageThreadPoolUtilization() * writeThreadPoolStats.totalThreadPoolThreads(); - - final String nodeId = usageStatsForThreadPoolsEntry.getKey(); - final RoutingNode node = allocation.routingNodes().node(nodeId); - final Set writeShardsOnNode = new HashSet<>(); - for (final ShardRouting shardRouting : node) { - if (shardRouting.role() != ShardRouting.Role.SEARCH_ONLY && writeIndexNames.contains(shardRouting.index().getName())) { - writeShardsOnNode.add(shardRouting.shardId()); - } - } - writeShardsOnNode.forEach( - shardId -> writeLoadPerShard.computeIfAbsent(shardId, k -> new Average()).add(writeUtilisation / writeShardsOnNode.size()) - ); - } - final ObjectFloatMap writeLoads = new ObjectFloatHashMap<>(writeLoadPerShard.size()); - writeLoadPerShard.forEach((shardId, average) -> writeLoads.put(shardId, average.get())); - return writeLoads; - } - - private static Set getWriteIndexNames(RoutingAllocation allocation) { - return allocation.metadata() - .projects() - .values() - .stream() - .map(ProjectMetadata::getIndicesLookup) - .flatMap(indicesLookup -> indicesLookup.values().stream()) - .map(IndexAbstraction::getWriteIndex) - .filter(Objects::nonNull) - .map(Index::getName) - .collect(Collectors.toUnmodifiableSet()); - } - - private static final class Average { - int count; - float sum; - - public void add(float value) { - count++; - sum += value; - } - - public float get() { - return sum / count; - } - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java index 0b9a12c3d347a..cd456b317217b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java @@ -13,15 +13,22 @@ import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.equalTo; @@ -33,6 +40,7 @@ public class WriteLoadPerShardSimulatorTests extends ESTestCase { private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() { }; + public static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" }; /** * We should not adjust the values if there's no movement @@ -40,7 +48,7 @@ public class WriteLoadPerShardSimulatorTests extends ESTestCase { public void testNoShardMovement() { final var originalNode0WriteLoadStats = randomUsageStats(); final var originalNode1WriteLoadStats = randomUsageStats(); - final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats); + final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); @@ -58,7 +66,7 @@ public void testNoShardMovement() { public void testMovementOfAShardWillReduceThreadPoolUtilisation() { final var originalNode0WriteLoadStats = randomUsageStats(); final var originalNode1WriteLoadStats = randomUsageStats(); - final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats); + final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); // Relocate a random shard from node_0 to node_1 @@ -83,7 +91,7 @@ public void testMovementOfAShardWillReduceThreadPoolUtilisation() { public void testMovementFollowedByMovementBackWillNotChangeAnything() { final var originalNode0WriteLoadStats = randomUsageStats(); final var originalNode1WriteLoadStats = randomUsageStats(); - final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats); + final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); // Relocate a random shard from node_0 to node_1 @@ -122,7 +130,11 @@ public void testMovementFollowedByMovementBackWillNotChangeAnything() { public void testMovementBetweenNodesWithNoThreadPoolStats() { final var originalNode0WriteLoadStats = randomBoolean() ? randomUsageStats() : null; final var originalNode1WriteLoadStats = randomBoolean() ? randomUsageStats() : null; - final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats); + final var allocation = createRoutingAllocation( + originalNode0WriteLoadStats, + originalNode1WriteLoadStats, + new HashSet<>(randomSubsetOf(Arrays.asList(INDICES))) + ); final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); // Relocate a random shard from node_0 to node_1 @@ -153,7 +165,8 @@ private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomUsageStats() { private RoutingAllocation createRoutingAllocation( NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0WriteLoadStats, - NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1WriteLoadStats + NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1WriteLoadStats, + Set indicesWithNoWriteLoad ) { final Map nodeUsageStats = new HashMap<>(); if (node0WriteLoadStats != null) { @@ -163,16 +176,29 @@ private RoutingAllocation createRoutingAllocation( nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1WriteLoadStats))); } + final ClusterState clusterState = createClusterState(); + final ClusterInfo clusterInfo = ClusterInfo.builder() + .nodeUsageStatsForThreadPools(nodeUsageStats) + .shardWriteLoads( + clusterState.metadata() + .getProject(ProjectId.DEFAULT) + .stream() + .filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false) + .flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum))) + .collect(Collectors.toUnmodifiableMap(shardId -> shardId, shardId -> randomDoubleBetween(0.1, 5.0, true))) + ) + .build(); + return new RoutingAllocation( new AllocationDeciders(List.of()), - createClusterState(), - ClusterInfo.builder().nodeUsageStatsForThreadPools(nodeUsageStats).build(), + clusterState, + clusterInfo, SnapshotShardSizeInfo.EMPTY, System.nanoTime() ).mutableCloneForSimulation(); } private ClusterState createClusterState() { - return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { "indexOne", "indexTwo", "indexThree" }, 3, 0); + return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(INDICES, 3, 0); } } From 988ac3c13823f660bee8f6fecf12111b89cbdb18 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 12:00:31 +1000 Subject: [PATCH 07/25] Add javadoc to WriteLoadPerShardSimulator --- .../cluster/routing/WriteLoadPerShardSimulator.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java index 3da30eade4868..c4c5486ef8adb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -21,6 +21,10 @@ import java.util.Map; import java.util.stream.Collectors; +/** + * Simulates the impact to each node's write-load in response to the movement of individual + * shards around the cluster. + */ public class WriteLoadPerShardSimulator { private final ObjectDoubleMap simulatedWriteLoadDeltas; From f5ed735f83f273c568fda011880c82287f2ac259 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 12:11:02 +1000 Subject: [PATCH 08/25] Explain simulateShardStarted better for the new shard case --- .../cluster/routing/WriteLoadPerShardSimulator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java index c4c5486ef8adb..a7d64ac80b735 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -45,7 +45,9 @@ public void simulateShardStarted(ShardRouting shardRouting) { simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); } else { - // not sure how this would come about, perhaps when allocating a replica after a delay? + // This is a new shard starting, it's unlikely we'll have a write-load value for a new + // shard, but we may be able to estimate if the new shard is created as part of a datastream + // rollover. See https://elasticco.atlassian.net/browse/ES-12469 simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); } } From 8501b37fe25dde777a91c5370f3fcf5863bc8534 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 13:19:05 +1000 Subject: [PATCH 09/25] Assert on scale of utilisation change --- .../WriteLoadPerShardSimulatorTests.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java index cd456b317217b..a7859dde545ed 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java @@ -31,6 +31,7 @@ import java.util.stream.IntStream; import java.util.stream.StreamSupport; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; @@ -77,14 +78,22 @@ public void testMovementOfAShardWillReduceThreadPoolUtilisation() { final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); + double shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); + final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0WriteLoadStats.totalThreadPoolThreads(); + final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1WriteLoadStats.totalThreadPoolThreads(); + // Some node_0 utilization should have been moved to node_1 assertThat( - getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), - lessThan(originalNode0WriteLoadStats.averageThreadPoolUtilization()) + (double) originalNode0WriteLoadStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization( + writeLoadPerShardSimulator, + "node_0" + ), + closeTo(expectedUtilisationReductionAtSource, 0.001f) ); assertThat( - getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"), - greaterThan(originalNode1WriteLoadStats.averageThreadPoolUtilization()) + (double) getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1") - originalNode1WriteLoadStats + .averageThreadPoolUtilization(), + closeTo(expectedUtilisationIncreaseAtDestination, 0.001f) ); } From 8c21cc030732a2df57decb9c945e562d74132852 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 13:21:35 +1000 Subject: [PATCH 10/25] Improve description of relocation --- .../cluster/routing/WriteLoadPerShardSimulator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java index a7d64ac80b735..26673eeeef20e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -41,7 +41,7 @@ public void simulateShardStarted(ShardRouting shardRouting) { final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId()); if (writeLoadForShard != null) { if (shardRouting.relocatingNodeId() != null) { - // relocating + // This is a shard being relocated simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); } else { From 519d1ddefab1aff80a1b500bcb842d9751449ed7 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 13:24:09 +1000 Subject: [PATCH 11/25] Typo --- .../cluster/routing/WriteLoadPerShardSimulatorTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java index a7859dde545ed..6c304646ccf3b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java @@ -75,8 +75,8 @@ public void testMovementOfAShardWillReduceThreadPoolUtilisation() { final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", randomNonNegativeLong(), "testing", NOOP); writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); - final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); - assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); + final var calculatedNodeUsageStats = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); + assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2)); double shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0WriteLoadStats.totalThreadPoolThreads(); From 58e84a265f3078d324bd9f6a360d50516cb8ea67 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 13:34:16 +1000 Subject: [PATCH 12/25] Rename test to indicate it also tests missing write loads --- .../cluster/routing/WriteLoadPerShardSimulatorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java index 6c304646ccf3b..33d549f26666b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java @@ -136,7 +136,7 @@ public void testMovementFollowedByMovementBackWillNotChangeAnything() { ); } - public void testMovementBetweenNodesWithNoThreadPoolStats() { + public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { final var originalNode0WriteLoadStats = randomBoolean() ? randomUsageStats() : null; final var originalNode1WriteLoadStats = randomBoolean() ? randomUsageStats() : null; final var allocation = createRoutingAllocation( From faccc3d879ff96abdfd82ef6789cab1366ab71ec Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 14:02:27 +1000 Subject: [PATCH 13/25] Always simulate based on original write loads and thread pool stats --- .../routing/WriteLoadPerShardSimulator.java | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java index 26673eeeef20e..fc8313737f424 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -27,14 +27,14 @@ */ public class WriteLoadPerShardSimulator { + private final Map originalNodeUsageStatsForThreadPools; private final ObjectDoubleMap simulatedWriteLoadDeltas; - private final RoutingAllocation routingAllocation; private final Map writeLoadsPerShard; public WriteLoadPerShardSimulator(RoutingAllocation routingAllocation) { - this.routingAllocation = routingAllocation; + this.originalNodeUsageStatsForThreadPools = routingAllocation.clusterInfo().getNodeUsageStatsForThreadPools(); + this.writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads(); this.simulatedWriteLoadDeltas = new ObjectDoubleHashMap<>(); - writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads(); } public void simulateShardStarted(ShardRouting shardRouting) { @@ -54,26 +54,22 @@ public void simulateShardStarted(ShardRouting shardRouting) { } public Map nodeUsageStatsForThreadPools() { - return routingAllocation.clusterInfo() - .getNodeUsageStatsForThreadPools() - .entrySet() - .stream() - .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> { - if (simulatedWriteLoadDeltas.containsKey(e.getKey())) { - return new NodeUsageStatsForThreadPools( - e.getKey(), - Maps.copyMapWithAddedOrReplacedEntry( - e.getValue().threadPoolUsageStatsMap(), - "write", - replaceWritePoolStats(e.getValue(), simulatedWriteLoadDeltas.get(e.getKey())) - ) - ); - } - return e.getValue(); - })); + return originalNodeUsageStatsForThreadPools.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> { + if (simulatedWriteLoadDeltas.containsKey(e.getKey())) { + return new NodeUsageStatsForThreadPools( + e.getKey(), + Maps.copyMapWithAddedOrReplacedEntry( + e.getValue().threadPoolUsageStatsMap(), + "write", + replaceWritePoolStats(e.getValue(), simulatedWriteLoadDeltas.get(e.getKey())) + ) + ); + } + return e.getValue(); + })); } - private NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( + private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( NodeUsageStatsForThreadPools value, double writeLoadDelta ) { From edc259a3d1831e280d567b0b873b42f5b453362c Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 14:09:52 +1000 Subject: [PATCH 14/25] Use for-loop instead of stream --- .../routing/WriteLoadPerShardSimulator.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java index fc8313737f424..aafb9940a6522 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -18,8 +18,8 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Collections; import java.util.Map; -import java.util.stream.Collectors; /** * Simulates the impact to each node's write-load in response to the movement of individual @@ -54,19 +54,25 @@ public void simulateShardStarted(ShardRouting shardRouting) { } public Map nodeUsageStatsForThreadPools() { - return originalNodeUsageStatsForThreadPools.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> { - if (simulatedWriteLoadDeltas.containsKey(e.getKey())) { - return new NodeUsageStatsForThreadPools( - e.getKey(), + final Map adjustedNodeUsageStatsForThreadPools = Maps.newMapWithExpectedSize( + originalNodeUsageStatsForThreadPools.size() + ); + for (Map.Entry entry : originalNodeUsageStatsForThreadPools.entrySet()) { + if (simulatedWriteLoadDeltas.containsKey(entry.getKey())) { + var adjustedValue = new NodeUsageStatsForThreadPools( + entry.getKey(), Maps.copyMapWithAddedOrReplacedEntry( - e.getValue().threadPoolUsageStatsMap(), + entry.getValue().threadPoolUsageStatsMap(), "write", - replaceWritePoolStats(e.getValue(), simulatedWriteLoadDeltas.get(e.getKey())) + replaceWritePoolStats(entry.getValue(), simulatedWriteLoadDeltas.get(entry.getKey())) ) ); + adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue); + } else { + adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), entry.getValue()); } - return e.getValue(); - })); + } + return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools); } private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( From f60029fcfc565d7e94b4e77400accd1d4677484c Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 14:16:24 +1000 Subject: [PATCH 15/25] Consolidate similar tests --- .../WriteLoadPerShardSimulatorTests.java | 34 +++---------------- 1 file changed, 5 insertions(+), 29 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java index 33d549f26666b..5db9d7a170ddf 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java @@ -33,8 +33,6 @@ import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.sameInstance; public class WriteLoadPerShardSimulatorTests extends ESTestCase { @@ -64,7 +62,7 @@ public void testNoShardMovement() { ); } - public void testMovementOfAShardWillReduceThreadPoolUtilisation() { + public void testMovementOfAShardWillMoveThreadPoolUtilisation() { final var originalNode0WriteLoadStats = randomUsageStats(); final var originalNode1WriteLoadStats = randomUsageStats(); final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); @@ -72,8 +70,10 @@ public void testMovementOfAShardWillReduceThreadPoolUtilisation() { // Relocate a random shard from node_0 to node_1 final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); - final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", randomNonNegativeLong(), "testing", NOOP); + final long expectedShardSize = randomNonNegativeLong(); + final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); + final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); final var calculatedNodeUsageStats = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2)); @@ -95,37 +95,13 @@ public void testMovementOfAShardWillReduceThreadPoolUtilisation() { .averageThreadPoolUtilization(), closeTo(expectedUtilisationIncreaseAtDestination, 0.001f) ); - } - - public void testMovementFollowedByMovementBackWillNotChangeAnything() { - final var originalNode0WriteLoadStats = randomUsageStats(); - final var originalNode1WriteLoadStats = randomUsageStats(); - final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); - final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); - - // Relocate a random shard from node_0 to node_1 - final long expectedShardSize = randomNonNegativeLong(); - final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); - final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); - writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); - final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); - - // Some node_0 utilization should have been moved to node_1 - assertThat( - getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), - lessThan(originalNode0WriteLoadStats.averageThreadPoolUtilization()) - ); - assertThat( - getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"), - greaterThan(originalNode1WriteLoadStats.averageThreadPoolUtilization()) - ); // Then move it back final var moveBackTuple = allocation.routingNodes() .relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP); writeLoadPerShardSimulator.simulateShardStarted(moveBackTuple.v2()); - // The utilization numbers should be back to their original values + // The utilization numbers should return to their original values assertThat( getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), equalTo(originalNode0WriteLoadStats.averageThreadPoolUtilization()) From 94687e32a0cc65a51e98eda84bc3662aac59212b Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 14:18:08 +1000 Subject: [PATCH 16/25] Naming/description of nodeUsageStatsForThreadPools --- .../org/elasticsearch/cluster/ClusterInfoSimulator.java | 2 +- .../cluster/routing/WriteLoadPerShardSimulator.java | 5 ++++- .../cluster/routing/WriteLoadPerShardSimulatorTests.java | 8 ++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index f1fc35efd067e..06412c22bd216 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -161,7 +161,7 @@ public ClusterInfo getClusterInfo() { dataPath, Map.of(), estimatedHeapUsages, - writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(), + writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(), allocation.clusterInfo().getShardWriteLoads() ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java index aafb9940a6522..46c1ad9b2953b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java @@ -53,7 +53,10 @@ public void simulateShardStarted(ShardRouting shardRouting) { } } - public Map nodeUsageStatsForThreadPools() { + /** + * Get the node usage stats with the simulated shard movements applied + */ + public Map simulatedNodeUsageStatsForThreadPools() { final Map adjustedNodeUsageStatsForThreadPools = Maps.newMapWithExpectedSize( originalNodeUsageStatsForThreadPools.size() ); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java index 5db9d7a170ddf..d3534513608f4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java @@ -50,7 +50,7 @@ public void testNoShardMovement() { final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); - final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); + final var calculatedNodeUsageStates = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); assertThat( calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"), @@ -75,7 +75,7 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); - final var calculatedNodeUsageStats = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); + final var calculatedNodeUsageStats = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2)); double shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); @@ -129,13 +129,13 @@ public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); - final var generated = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); + final var generated = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); assertThat(generated.containsKey("node_0"), equalTo(originalNode0WriteLoadStats != null)); assertThat(generated.containsKey("node_1"), equalTo(originalNode1WriteLoadStats != null)); } private float getAverageWritePoolUtilization(WriteLoadPerShardSimulator writeLoadPerShardSimulator, String nodeId) { - final var generatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(); + final var generatedNodeUsageStates = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write"); return node0WritePoolStats.averageThreadPoolUtilization(); } From 466a7e0b81f4f1bdd4cd0dca17ad3aa5b8b1703e Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 14:22:57 +1000 Subject: [PATCH 17/25] Naming of test utility methods --- .../WriteLoadPerShardSimulatorTests.java | 70 +++++++++++-------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java index d3534513608f4..690023915c451 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java @@ -39,33 +39,41 @@ public class WriteLoadPerShardSimulatorTests extends ESTestCase { private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() { }; - public static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" }; + private static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" }; /** * We should not adjust the values if there's no movement */ public void testNoShardMovement() { - final var originalNode0WriteLoadStats = randomUsageStats(); - final var originalNode1WriteLoadStats = randomUsageStats(); - final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); + final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats(); + final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats(); + final var allocation = createRoutingAllocationWithRandomisedWriteLoads( + originalNode0ThreadPoolStats, + originalNode1ThreadPoolStats, + Set.of() + ); final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); final var calculatedNodeUsageStates = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); assertThat( calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"), - sameInstance(originalNode0WriteLoadStats) + sameInstance(originalNode0ThreadPoolStats) ); assertThat( calculatedNodeUsageStates.get("node_1").threadPoolUsageStatsMap().get("write"), - sameInstance(originalNode1WriteLoadStats) + sameInstance(originalNode1ThreadPoolStats) ); } public void testMovementOfAShardWillMoveThreadPoolUtilisation() { - final var originalNode0WriteLoadStats = randomUsageStats(); - final var originalNode1WriteLoadStats = randomUsageStats(); - final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of()); + final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats(); + final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats(); + final var allocation = createRoutingAllocationWithRandomisedWriteLoads( + originalNode0ThreadPoolStats, + originalNode1ThreadPoolStats, + Set.of() + ); final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); // Relocate a random shard from node_0 to node_1 @@ -79,19 +87,19 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2)); double shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); - final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0WriteLoadStats.totalThreadPoolThreads(); - final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1WriteLoadStats.totalThreadPoolThreads(); + final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads(); + final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.totalThreadPoolThreads(); // Some node_0 utilization should have been moved to node_1 assertThat( - (double) originalNode0WriteLoadStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization( + (double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization( writeLoadPerShardSimulator, "node_0" ), closeTo(expectedUtilisationReductionAtSource, 0.001f) ); assertThat( - (double) getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1") - originalNode1WriteLoadStats + (double) getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1") - originalNode1ThreadPoolStats .averageThreadPoolUtilization(), closeTo(expectedUtilisationIncreaseAtDestination, 0.001f) ); @@ -104,20 +112,20 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { // The utilization numbers should return to their original values assertThat( getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), - equalTo(originalNode0WriteLoadStats.averageThreadPoolUtilization()) + equalTo(originalNode0ThreadPoolStats.averageThreadPoolUtilization()) ); assertThat( getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"), - equalTo(originalNode1WriteLoadStats.averageThreadPoolUtilization()) + equalTo(originalNode1ThreadPoolStats.averageThreadPoolUtilization()) ); } public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { - final var originalNode0WriteLoadStats = randomBoolean() ? randomUsageStats() : null; - final var originalNode1WriteLoadStats = randomBoolean() ? randomUsageStats() : null; - final var allocation = createRoutingAllocation( - originalNode0WriteLoadStats, - originalNode1WriteLoadStats, + final var originalNode0ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null; + final var originalNode1ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null; + final var allocation = createRoutingAllocationWithRandomisedWriteLoads( + originalNode0ThreadPoolStats, + originalNode1ThreadPoolStats, new HashSet<>(randomSubsetOf(Arrays.asList(INDICES))) ); final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); @@ -129,9 +137,9 @@ public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); - final var generated = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); - assertThat(generated.containsKey("node_0"), equalTo(originalNode0WriteLoadStats != null)); - assertThat(generated.containsKey("node_1"), equalTo(originalNode1WriteLoadStats != null)); + final var simulated = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); + assertThat(simulated.containsKey("node_0"), equalTo(originalNode0ThreadPoolStats != null)); + assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null)); } private float getAverageWritePoolUtilization(WriteLoadPerShardSimulator writeLoadPerShardSimulator, String nodeId) { @@ -140,7 +148,7 @@ private float getAverageWritePoolUtilization(WriteLoadPerShardSimulator writeLoa return node0WritePoolStats.averageThreadPoolUtilization(); } - private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomUsageStats() { + private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageStats() { return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( randomIntBetween(4, 16), randomFloatBetween(0.1f, 1.0f, true), @@ -148,17 +156,17 @@ private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomUsageStats() { ); } - private RoutingAllocation createRoutingAllocation( - NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0WriteLoadStats, - NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1WriteLoadStats, + private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads( + NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0ThreadPoolStats, + NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1ThreadPoolStats, Set indicesWithNoWriteLoad ) { final Map nodeUsageStats = new HashMap<>(); - if (node0WriteLoadStats != null) { - nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0WriteLoadStats))); + if (node0ThreadPoolStats != null) { + nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0ThreadPoolStats))); } - if (node1WriteLoadStats != null) { - nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1WriteLoadStats))); + if (node1ThreadPoolStats != null) { + nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1ThreadPoolStats))); } final ClusterState clusterState = createClusterState(); From 827f637ed09a33df44e058fab2b258b43d9c5a2a Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 14:26:28 +1000 Subject: [PATCH 18/25] WriteLoadPerShardSimulator -> ShardMovementWriteLoadSimulator --- .../cluster/ClusterInfoSimulator.java | 10 +++--- ...a => ShardMovementWriteLoadSimulator.java} | 4 +-- ...ShardMovementWriteLoadSimulatorTests.java} | 32 +++++++++---------- 3 files changed, 23 insertions(+), 23 deletions(-) rename server/src/main/java/org/elasticsearch/cluster/routing/{WriteLoadPerShardSimulator.java => ShardMovementWriteLoadSimulator.java} (97%) rename server/src/test/java/org/elasticsearch/cluster/routing/{WriteLoadPerShardSimulatorTests.java => ShardMovementWriteLoadSimulatorTests.java} (84%) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index 06412c22bd216..c445b4df2b2a1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -10,8 +10,8 @@ package org.elasticsearch.cluster; import org.elasticsearch.cluster.ClusterInfo.NodeAndShard; +import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.WriteLoadPerShardSimulator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.util.CopyOnFirstWriteMap; import org.elasticsearch.index.shard.ShardId; @@ -35,7 +35,7 @@ public class ClusterInfoSimulator { private final Map shardDataSetSizes; private final Map dataPath; private final Map estimatedHeapUsages; - private final WriteLoadPerShardSimulator writeLoadPerShardSimulator; + private final ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator; public ClusterInfoSimulator(RoutingAllocation allocation) { this.allocation = allocation; @@ -45,7 +45,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) { this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes); this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath); this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages(); - this.writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); + this.shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); } /** @@ -116,7 +116,7 @@ public void simulateShardStarted(ShardRouting shard) { shardSizes.put(shardIdentifierFromRouting(shard), project.getIndexSafe(shard.index()).ignoreDiskWatermarks() ? 0 : size); } } - writeLoadPerShardSimulator.simulateShardStarted(shard); + shardMovementWriteLoadSimulator.simulateShardStarted(shard); } private void modifyDiskUsage(String nodeId, long freeDelta) { @@ -161,7 +161,7 @@ public ClusterInfo getClusterInfo() { dataPath, Map.of(), estimatedHeapUsages, - writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(), + shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(), allocation.clusterInfo().getShardWriteLoads() ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java similarity index 97% rename from server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java rename to server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 46c1ad9b2953b..1e8c1eefd1bf7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -25,13 +25,13 @@ * Simulates the impact to each node's write-load in response to the movement of individual * shards around the cluster. */ -public class WriteLoadPerShardSimulator { +public class ShardMovementWriteLoadSimulator { private final Map originalNodeUsageStatsForThreadPools; private final ObjectDoubleMap simulatedWriteLoadDeltas; private final Map writeLoadsPerShard; - public WriteLoadPerShardSimulator(RoutingAllocation routingAllocation) { + public ShardMovementWriteLoadSimulator(RoutingAllocation routingAllocation) { this.originalNodeUsageStatsForThreadPools = routingAllocation.clusterInfo().getNodeUsageStatsForThreadPools(); this.writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads(); this.simulatedWriteLoadDeltas = new ObjectDoubleHashMap<>(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java similarity index 84% rename from server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java rename to server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java index 690023915c451..2bc74c7a4b91b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java @@ -35,7 +35,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; -public class WriteLoadPerShardSimulatorTests extends ESTestCase { +public class ShardMovementWriteLoadSimulatorTests extends ESTestCase { private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() { }; @@ -53,8 +53,8 @@ public void testNoShardMovement() { Set.of() ); - final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); - final var calculatedNodeUsageStates = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); + final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); + final var calculatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); assertThat( calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"), @@ -74,16 +74,16 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { originalNode1ThreadPoolStats, Set.of() ); - final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); + final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); // Relocate a random shard from node_0 to node_1 final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); final long expectedShardSize = randomNonNegativeLong(); final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); - writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); + shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2()); final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); - final var calculatedNodeUsageStats = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); + final var calculatedNodeUsageStats = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2)); double shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); @@ -93,13 +93,13 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { // Some node_0 utilization should have been moved to node_1 assertThat( (double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization( - writeLoadPerShardSimulator, + shardMovementWriteLoadSimulator, "node_0" ), closeTo(expectedUtilisationReductionAtSource, 0.001f) ); assertThat( - (double) getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1") - originalNode1ThreadPoolStats + (double) getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1") - originalNode1ThreadPoolStats .averageThreadPoolUtilization(), closeTo(expectedUtilisationIncreaseAtDestination, 0.001f) ); @@ -107,15 +107,15 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { // Then move it back final var moveBackTuple = allocation.routingNodes() .relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP); - writeLoadPerShardSimulator.simulateShardStarted(moveBackTuple.v2()); + shardMovementWriteLoadSimulator.simulateShardStarted(moveBackTuple.v2()); // The utilization numbers should return to their original values assertThat( - getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"), + getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), equalTo(originalNode0ThreadPoolStats.averageThreadPoolUtilization()) ); assertThat( - getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"), + getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1"), equalTo(originalNode1ThreadPoolStats.averageThreadPoolUtilization()) ); } @@ -128,22 +128,22 @@ public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { originalNode1ThreadPoolStats, new HashSet<>(randomSubsetOf(Arrays.asList(INDICES))) ); - final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation); + final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); // Relocate a random shard from node_0 to node_1 final long expectedShardSize = randomNonNegativeLong(); final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); - writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2()); + shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2()); allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); - final var simulated = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); + final var simulated = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); assertThat(simulated.containsKey("node_0"), equalTo(originalNode0ThreadPoolStats != null)); assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null)); } - private float getAverageWritePoolUtilization(WriteLoadPerShardSimulator writeLoadPerShardSimulator, String nodeId) { - final var generatedNodeUsageStates = writeLoadPerShardSimulator.simulatedNodeUsageStatsForThreadPools(); + private float getAverageWritePoolUtilization(ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator, String nodeId) { + final var generatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write"); return node0WritePoolStats.averageThreadPoolUtilization(); } From a072aaf76b6b0e90fb46c991bf55576a02c348b8 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 15:11:39 +1000 Subject: [PATCH 19/25] Increase likelihood of write loads and utilizations being 0, floor utilization numbers at 0.0 --- .../ShardMovementWriteLoadSimulator.java | 6 ++-- .../ShardMovementWriteLoadSimulatorTests.java | 28 +++++++++++++------ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 1e8c1eefd1bf7..383d409e704d2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -86,8 +86,10 @@ private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoo .get(ThreadPool.Names.WRITE); return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( writeThreadPoolStats.totalThreadPoolThreads(), - (float) (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats - .totalThreadPoolThreads())), + (float) Math.max( + (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())), + 0.0 + ), writeThreadPoolStats.averageThreadPoolQueueLatencyMillis() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java index 2bc74c7a4b91b..682c858e3f640 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java @@ -91,13 +91,18 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.totalThreadPoolThreads(); // Some node_0 utilization should have been moved to node_1 - assertThat( - (double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization( - shardMovementWriteLoadSimulator, - "node_0" - ), - closeTo(expectedUtilisationReductionAtSource, 0.001f) - ); + if (expectedUtilisationReductionAtSource > originalNode0ThreadPoolStats.averageThreadPoolUtilization()) { + // We don't return utilization less than zero because that makes no sense + assertThat(getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), equalTo(0.0f)); + } else { + assertThat( + (double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization( + shardMovementWriteLoadSimulator, + "node_0" + ), + closeTo(expectedUtilisationReductionAtSource, 0.001f) + ); + } assertThat( (double) getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1") - originalNode1ThreadPoolStats .averageThreadPoolUtilization(), @@ -151,7 +156,7 @@ private float getAverageWritePoolUtilization(ShardMovementWriteLoadSimulator sha private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageStats() { return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( randomIntBetween(4, 16), - randomFloatBetween(0.1f, 1.0f, true), + randomBoolean() ? 0.0f : randomFloatBetween(0.1f, 1.0f, true), randomLongBetween(0, 60_000) ); } @@ -178,7 +183,12 @@ private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads( .stream() .filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false) .flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum))) - .collect(Collectors.toUnmodifiableMap(shardId -> shardId, shardId -> randomDoubleBetween(0.1, 5.0, true))) + .collect( + Collectors.toUnmodifiableMap( + shardId -> shardId, + shardId -> randomBoolean() ? 0.0f : randomDoubleBetween(0.1, 5.0, true) + ) + ) ) .build(); From 6ae4aa417a45ebbc228dc85465831d3aa9df1de9 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 24 Jul 2025 15:25:03 +1000 Subject: [PATCH 20/25] Pedantry --- .../routing/ShardMovementWriteLoadSimulatorTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java index 682c858e3f640..59a37747ae2fd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java @@ -78,7 +78,7 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { // Relocate a random shard from node_0 to node_1 final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); - final long expectedShardSize = randomNonNegativeLong(); + final var expectedShardSize = randomNonNegativeLong(); final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2()); final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); @@ -86,7 +86,7 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { final var calculatedNodeUsageStats = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2)); - double shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); + final var shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads(); final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.totalThreadPoolThreads(); @@ -136,7 +136,7 @@ public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); // Relocate a random shard from node_0 to node_1 - final long expectedShardSize = randomNonNegativeLong(); + final var expectedShardSize = randomNonNegativeLong(); final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2()); From 770e04deae97c5d923abd519ecb1ce720d0ad08e Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 28 Jul 2025 12:05:13 +1000 Subject: [PATCH 21/25] Assert that shardStarted only happens on destination node ina relocation --- .../cluster/routing/ShardMovementWriteLoadSimulator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 383d409e704d2..71fcdcd22e810 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -41,6 +41,8 @@ public void simulateShardStarted(ShardRouting shardRouting) { final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId()); if (writeLoadForShard != null) { if (shardRouting.relocatingNodeId() != null) { + assert shardRouting.state() == ShardRoutingState.INITIALIZING + : "This should only be happening on the destination node (the source node will have status RELOCATING"; // This is a shard being relocated simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); From 34472f6d7c7c7b15543936856435e04cb4429d48 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 28 Jul 2025 12:07:05 +1000 Subject: [PATCH 22/25] Typo --- .../cluster/routing/ShardMovementWriteLoadSimulator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 71fcdcd22e810..e4306f185be74 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -42,7 +42,7 @@ public void simulateShardStarted(ShardRouting shardRouting) { if (writeLoadForShard != null) { if (shardRouting.relocatingNodeId() != null) { assert shardRouting.state() == ShardRoutingState.INITIALIZING - : "This should only be happening on the destination node (the source node will have status RELOCATING"; + : "This should only be happening on the destination node (the source node will have status RELOCATING)"; // This is a shard being relocated simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); From cf817807bc38557c0db3a3191caff24e8db4c44b Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 29 Jul 2025 10:43:27 +1000 Subject: [PATCH 23/25] Update server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java Co-authored-by: Dianna Hohensee --- .../cluster/routing/ShardMovementWriteLoadSimulator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index e4306f185be74..ba04e85026178 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -56,7 +56,7 @@ public void simulateShardStarted(ShardRouting shardRouting) { } /** - * Get the node usage stats with the simulated shard movements applied + * Apply the simulated shard movements to the original thread pool usage stats for each node. */ public Map simulatedNodeUsageStatsForThreadPools() { final Map adjustedNodeUsageStatsForThreadPools = Maps.newMapWithExpectedSize( From 27a9ba0c49eaea56514907f86f13343328d30c07 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 29 Jul 2025 11:02:02 +1000 Subject: [PATCH 24/25] Update server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java Co-authored-by: Dianna Hohensee --- .../cluster/routing/ShardMovementWriteLoadSimulator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index ba04e85026178..e3d389174ac89 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -68,7 +68,7 @@ public Map simulatedNodeUsageStatsForThrea entry.getKey(), Maps.copyMapWithAddedOrReplacedEntry( entry.getValue().threadPoolUsageStatsMap(), - "write", + ThreadPool.Names.WRITE, replaceWritePoolStats(entry.getValue(), simulatedWriteLoadDeltas.get(entry.getKey())) ) ); From 1e0a4a0815815c74ec94ac7ba8e5a9b3a85609f2 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 29 Jul 2025 11:53:03 +1000 Subject: [PATCH 25/25] Naming --- .../routing/ShardMovementWriteLoadSimulator.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index e3d389174ac89..1a729a992583c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -28,13 +28,13 @@ public class ShardMovementWriteLoadSimulator { private final Map originalNodeUsageStatsForThreadPools; - private final ObjectDoubleMap simulatedWriteLoadDeltas; + private final ObjectDoubleMap simulatedNodeWriteLoadDeltas; private final Map writeLoadsPerShard; public ShardMovementWriteLoadSimulator(RoutingAllocation routingAllocation) { this.originalNodeUsageStatsForThreadPools = routingAllocation.clusterInfo().getNodeUsageStatsForThreadPools(); this.writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads(); - this.simulatedWriteLoadDeltas = new ObjectDoubleHashMap<>(); + this.simulatedNodeWriteLoadDeltas = new ObjectDoubleHashMap<>(); } public void simulateShardStarted(ShardRouting shardRouting) { @@ -44,13 +44,13 @@ public void simulateShardStarted(ShardRouting shardRouting) { assert shardRouting.state() == ShardRoutingState.INITIALIZING : "This should only be happening on the destination node (the source node will have status RELOCATING)"; // This is a shard being relocated - simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); - simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); + simulatedNodeWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); + simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); } else { // This is a new shard starting, it's unlikely we'll have a write-load value for a new // shard, but we may be able to estimate if the new shard is created as part of a datastream // rollover. See https://elasticco.atlassian.net/browse/ES-12469 - simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); + simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); } } } @@ -63,13 +63,13 @@ public Map simulatedNodeUsageStatsForThrea originalNodeUsageStatsForThreadPools.size() ); for (Map.Entry entry : originalNodeUsageStatsForThreadPools.entrySet()) { - if (simulatedWriteLoadDeltas.containsKey(entry.getKey())) { + if (simulatedNodeWriteLoadDeltas.containsKey(entry.getKey())) { var adjustedValue = new NodeUsageStatsForThreadPools( entry.getKey(), Maps.copyMapWithAddedOrReplacedEntry( entry.getValue().threadPoolUsageStatsMap(), ThreadPool.Names.WRITE, - replaceWritePoolStats(entry.getValue(), simulatedWriteLoadDeltas.get(entry.getKey())) + replaceWritePoolStats(entry.getValue(), simulatedNodeWriteLoadDeltas.get(entry.getKey())) ) ); adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue);