diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index fd9c62daebd29..c445b4df2b2a1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -10,6 +10,7 @@ package org.elasticsearch.cluster; import org.elasticsearch.cluster.ClusterInfo.NodeAndShard; +import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.util.CopyOnFirstWriteMap; @@ -34,7 +35,7 @@ public class ClusterInfoSimulator { private final Map shardDataSetSizes; private final Map dataPath; private final Map estimatedHeapUsages; - private final Map nodeThreadPoolUsageStats; + private final ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator; public ClusterInfoSimulator(RoutingAllocation allocation) { this.allocation = allocation; @@ -44,7 +45,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) { this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes); this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath); this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages(); - this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); + this.shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); } /** @@ -115,6 +116,7 @@ public void simulateShardStarted(ShardRouting shard) { shardSizes.put(shardIdentifierFromRouting(shard), project.getIndexSafe(shard.index()).ignoreDiskWatermarks() ? 0 : size); } } + shardMovementWriteLoadSimulator.simulateShardStarted(shard); } private void modifyDiskUsage(String nodeId, long freeDelta) { @@ -159,7 +161,7 @@ public ClusterInfo getClusterInfo() { dataPath, Map.of(), estimatedHeapUsages, - nodeThreadPoolUsageStats, + shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(), allocation.clusterInfo().getShardWriteLoads() ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java new file mode 100644 index 0000000000000..1a729a992583c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing; + +import com.carrotsearch.hppc.ObjectDoubleHashMap; +import com.carrotsearch.hppc.ObjectDoubleMap; + +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.Map; + +/** + * Simulates the impact to each node's write-load in response to the movement of individual + * shards around the cluster. + */ +public class ShardMovementWriteLoadSimulator { + + private final Map originalNodeUsageStatsForThreadPools; + private final ObjectDoubleMap simulatedNodeWriteLoadDeltas; + private final Map writeLoadsPerShard; + + public ShardMovementWriteLoadSimulator(RoutingAllocation routingAllocation) { + this.originalNodeUsageStatsForThreadPools = routingAllocation.clusterInfo().getNodeUsageStatsForThreadPools(); + this.writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads(); + this.simulatedNodeWriteLoadDeltas = new ObjectDoubleHashMap<>(); + } + + public void simulateShardStarted(ShardRouting shardRouting) { + final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId()); + if (writeLoadForShard != null) { + if (shardRouting.relocatingNodeId() != null) { + assert shardRouting.state() == ShardRoutingState.INITIALIZING + : "This should only be happening on the destination node (the source node will have status RELOCATING)"; + // This is a shard being relocated + simulatedNodeWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard); + simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); + } else { + // This is a new shard starting, it's unlikely we'll have a write-load value for a new + // shard, but we may be able to estimate if the new shard is created as part of a datastream + // rollover. See https://elasticco.atlassian.net/browse/ES-12469 + simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard); + } + } + } + + /** + * Apply the simulated shard movements to the original thread pool usage stats for each node. + */ + public Map simulatedNodeUsageStatsForThreadPools() { + final Map adjustedNodeUsageStatsForThreadPools = Maps.newMapWithExpectedSize( + originalNodeUsageStatsForThreadPools.size() + ); + for (Map.Entry entry : originalNodeUsageStatsForThreadPools.entrySet()) { + if (simulatedNodeWriteLoadDeltas.containsKey(entry.getKey())) { + var adjustedValue = new NodeUsageStatsForThreadPools( + entry.getKey(), + Maps.copyMapWithAddedOrReplacedEntry( + entry.getValue().threadPoolUsageStatsMap(), + ThreadPool.Names.WRITE, + replaceWritePoolStats(entry.getValue(), simulatedNodeWriteLoadDeltas.get(entry.getKey())) + ) + ); + adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue); + } else { + adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), entry.getValue()); + } + } + return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools); + } + + private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( + NodeUsageStatsForThreadPools value, + double writeLoadDelta + ) { + final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE); + return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + writeThreadPoolStats.totalThreadPoolThreads(), + (float) Math.max( + (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())), + 0.0 + ), + writeThreadPoolStats.averageThreadPoolQueueLatencyMillis() + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java new file mode 100644 index 0000000000000..59a37747ae2fd --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java @@ -0,0 +1,207 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.SnapshotShardSizeInfo; +import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class ShardMovementWriteLoadSimulatorTests extends ESTestCase { + + private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() { + }; + private static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" }; + + /** + * We should not adjust the values if there's no movement + */ + public void testNoShardMovement() { + final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats(); + final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats(); + final var allocation = createRoutingAllocationWithRandomisedWriteLoads( + originalNode0ThreadPoolStats, + originalNode1ThreadPoolStats, + Set.of() + ); + + final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); + final var calculatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); + assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2)); + assertThat( + calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"), + sameInstance(originalNode0ThreadPoolStats) + ); + assertThat( + calculatedNodeUsageStates.get("node_1").threadPoolUsageStatsMap().get("write"), + sameInstance(originalNode1ThreadPoolStats) + ); + } + + public void testMovementOfAShardWillMoveThreadPoolUtilisation() { + final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats(); + final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats(); + final var allocation = createRoutingAllocationWithRandomisedWriteLoads( + originalNode0ThreadPoolStats, + originalNode1ThreadPoolStats, + Set.of() + ); + final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); + + // Relocate a random shard from node_0 to node_1 + final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); + final var expectedShardSize = randomNonNegativeLong(); + final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); + shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2()); + final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); + + final var calculatedNodeUsageStats = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); + assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2)); + + final var shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); + final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads(); + final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.totalThreadPoolThreads(); + + // Some node_0 utilization should have been moved to node_1 + if (expectedUtilisationReductionAtSource > originalNode0ThreadPoolStats.averageThreadPoolUtilization()) { + // We don't return utilization less than zero because that makes no sense + assertThat(getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), equalTo(0.0f)); + } else { + assertThat( + (double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization( + shardMovementWriteLoadSimulator, + "node_0" + ), + closeTo(expectedUtilisationReductionAtSource, 0.001f) + ); + } + assertThat( + (double) getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1") - originalNode1ThreadPoolStats + .averageThreadPoolUtilization(), + closeTo(expectedUtilisationIncreaseAtDestination, 0.001f) + ); + + // Then move it back + final var moveBackTuple = allocation.routingNodes() + .relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP); + shardMovementWriteLoadSimulator.simulateShardStarted(moveBackTuple.v2()); + + // The utilization numbers should return to their original values + assertThat( + getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), + equalTo(originalNode0ThreadPoolStats.averageThreadPoolUtilization()) + ); + assertThat( + getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1"), + equalTo(originalNode1ThreadPoolStats.averageThreadPoolUtilization()) + ); + } + + public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { + final var originalNode0ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null; + final var originalNode1ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null; + final var allocation = createRoutingAllocationWithRandomisedWriteLoads( + originalNode0ThreadPoolStats, + originalNode1ThreadPoolStats, + new HashSet<>(randomSubsetOf(Arrays.asList(INDICES))) + ); + final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); + + // Relocate a random shard from node_0 to node_1 + final var expectedShardSize = randomNonNegativeLong(); + final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); + final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); + shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2()); + allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); + + final var simulated = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); + assertThat(simulated.containsKey("node_0"), equalTo(originalNode0ThreadPoolStats != null)); + assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null)); + } + + private float getAverageWritePoolUtilization(ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator, String nodeId) { + final var generatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); + final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write"); + return node0WritePoolStats.averageThreadPoolUtilization(); + } + + private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageStats() { + return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + randomIntBetween(4, 16), + randomBoolean() ? 0.0f : randomFloatBetween(0.1f, 1.0f, true), + randomLongBetween(0, 60_000) + ); + } + + private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads( + NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0ThreadPoolStats, + NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1ThreadPoolStats, + Set indicesWithNoWriteLoad + ) { + final Map nodeUsageStats = new HashMap<>(); + if (node0ThreadPoolStats != null) { + nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0ThreadPoolStats))); + } + if (node1ThreadPoolStats != null) { + nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1ThreadPoolStats))); + } + + final ClusterState clusterState = createClusterState(); + final ClusterInfo clusterInfo = ClusterInfo.builder() + .nodeUsageStatsForThreadPools(nodeUsageStats) + .shardWriteLoads( + clusterState.metadata() + .getProject(ProjectId.DEFAULT) + .stream() + .filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false) + .flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum))) + .collect( + Collectors.toUnmodifiableMap( + shardId -> shardId, + shardId -> randomBoolean() ? 0.0f : randomDoubleBetween(0.1, 5.0, true) + ) + ) + ) + .build(); + + return new RoutingAllocation( + new AllocationDeciders(List.of()), + clusterState, + clusterInfo, + SnapshotShardSizeInfo.EMPTY, + System.nanoTime() + ).mutableCloneForSimulation(); + } + + private ClusterState createClusterState() { + return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(INDICES, 3, 0); + } +}