From 2806d306d7d4dc055e5a4a3fc1c8f6dd1beb238f Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 31 Jul 2025 16:09:10 +1000 Subject: [PATCH 01/11] Breakdown undesired allocations by tier --- .../elasticsearch/cluster/ClusterModule.java | 3 +- .../allocator/BalancingWeightsFactory.java | 9 +++ .../allocator/DesiredBalanceMetrics.java | 26 ++++++-- .../allocator/DesiredBalanceReconciler.java | 66 +++++++++++++++---- .../DesiredBalanceShardsAllocator.java | 15 +++-- .../GlobalBalancingWeightsFactory.java | 6 ++ ...nsportDeleteDesiredBalanceActionTests.java | 5 +- .../AllocationStatsServiceTests.java | 3 +- .../BalancedShardsAllocatorTests.java | 7 +- .../ClusterAllocationSimulationTests.java | 3 +- .../DesiredBalanceReconcilerTests.java | 16 ++++- .../DesiredBalanceShardsAllocatorTests.java | 30 ++++++--- .../cluster/ESAllocationTestCase.java | 3 +- 13 files changed, 155 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 5633bd8b89e1e..eacb6a44da780 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -496,7 +496,8 @@ private static ShardsAllocator createShardsAllocator( clusterService, reconciler, telemetryProvider, - nodeAllocationStatsAndWeightsCalculator + nodeAllocationStatsAndWeightsCalculator, + balancingWeightsFactory ) ); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java index 23c556fe9d9d1..7748af7060dd4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java @@ -9,6 +9,8 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import org.elasticsearch.cluster.routing.ShardRouting; + /** * A balancing weights factory must be able to divide all shards and nodes into mutually * disjoint partitions. Allocation balancing will then be conducted sequentially for each partition. @@ -18,4 +20,11 @@ public interface BalancingWeightsFactory { BalancingWeights create(); + + /** + * Get partition for shard + * + * @return A string identifying which partition the shard belongs to + */ + String partitionForShard(ShardRouting shardRouting); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index 5771c27c5d5ab..47312254a88f0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -31,12 +31,30 @@ public class DesiredBalanceMetrics { /** * @param unassignedShards Shards that are not assigned to any node. + * @param partitionStats Allocation stats broken down by balancer partition + */ + public record AllocationStats(long unassignedShards, Map partitionStats) { + + public AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) { + this(unassignedShards, Map.of("global", new PartitionStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes))); + } + + public long totalAllocations() { + return partitionStats.values().stream().mapToLong(PartitionStats::totalAllocations).sum(); + } + + public long undesiredAllocationsExcludingShuttingDownNodes() { + return partitionStats.values().stream().mapToLong(PartitionStats::undesiredAllocationsExcludingShuttingDownNodes).sum(); + } + } + + /** * @param totalAllocations Shards that are assigned to a node. * @param undesiredAllocationsExcludingShuttingDownNodes Shards that are assigned to a node but must move to alleviate a resource * constraint per the {@link AllocationDeciders}. Excludes shards that must move * because of a node shutting down. */ - public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {} + public record PartitionStats(long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {} public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) { public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0); @@ -71,7 +89,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME = "es.allocator.allocations.node.forecasted_disk_usage_bytes.current"; - public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1); + public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, Map.of()); private volatile boolean nodeIsMaster = false; @@ -105,8 +123,8 @@ public void updateMetrics( assert weightStatsPerNode != null : "node balance weight stats cannot be null"; if (allocationStats != EMPTY_ALLOCATION_STATS) { this.unassignedShards = allocationStats.unassignedShards; - this.totalAllocations = allocationStats.totalAllocations; - this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes; + this.totalAllocations = allocationStats.totalAllocations(); + this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes(); } weightStatsPerNodeRef.set(weightStatsPerNode); allocationStatsPerNodeRef.set(nodeAllocationStats); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 8e69d72777f04..511f2deb57a4b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.gateway.PriorityComparator; @@ -34,8 +35,11 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -79,12 +83,18 @@ public class DesiredBalanceReconciler { private double undesiredAllocationsLogThreshold; private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering(); private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering(); + private final BalancingWeightsFactory balancingWeightsFactory; - public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) { + public DesiredBalanceReconciler( + ClusterSettings clusterSettings, + ThreadPool threadPool, + BalancingWeightsFactory balancingWeightsFactory + ) { this.undesiredAllocationLogInterval = new FrequencyCappedAction( threadPool.relativeTimeInMillisSupplier(), TimeValue.timeValueMinutes(5) ); + this.balancingWeightsFactory = balancingWeightsFactory; clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, this.undesiredAllocationLogInterval::setMinInterval); clusterSettings.initializeAndWatch( UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING, @@ -523,16 +533,16 @@ private void moveShards() { private DesiredBalanceMetrics.AllocationStats balance() { int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size(); - int totalAllocations = 0; - int undesiredAllocationsExcludingShuttingDownNodes = 0; + final AllocationStatsBuilder allocationStatsBuilder = new AllocationStatsBuilder(unassignedShards); // Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard // movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the // shards. for (final var iterator = OrderedShardsIterator.createForBalancing(allocation, moveOrdering); iterator.hasNext();) { final var shardRouting = iterator.next(); + final String shardPartition = balancingWeightsFactory.partitionForShard(shardRouting); - totalAllocations++; + allocationStatsBuilder.incrementTotalAllocations(shardPartition); if (shardRouting.started() == false) { // can only rebalance started shards @@ -552,7 +562,7 @@ private DesiredBalanceMetrics.AllocationStats balance() { if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) { // shard is not on a shutting down node, nor is it on a desired node per the previous check. - undesiredAllocationsExcludingShuttingDownNodes++; + allocationStatsBuilder.incrementUndesiredAllocationsExcludingShuttingDownNodes(shardPartition); } if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { @@ -591,15 +601,16 @@ private DesiredBalanceMetrics.AllocationStats balance() { } } - maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size()); - return new DesiredBalanceMetrics.AllocationStats( - unassignedShards, - totalAllocations, - undesiredAllocationsExcludingShuttingDownNodes + final DesiredBalanceMetrics.AllocationStats allocationStats = allocationStatsBuilder.create(); + maybeLogUndesiredAllocationsWarning( + allocationStats.totalAllocations(), + allocationStats.undesiredAllocationsExcludingShuttingDownNodes(), + routingNodes.size() ); + return allocationStats; } - private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) { + private void maybeLogUndesiredAllocationsWarning(long totalAllocations, long undesiredAllocations, int nodeCount) { // more shards than cluster can relocate with one reroute final boolean nonEmptyRelocationBacklog = undesiredAllocations > 2L * nodeCount; final boolean warningThresholdReached = undesiredAllocations > undesiredAllocationsLogThreshold * totalAllocations; @@ -662,5 +673,38 @@ private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, Rout assert target != null : "Target node is not found"; return allocation.deciders().canForceAllocateDuringReplace(shardRouting, target, allocation); } + + private static class AllocationStatsBuilder { + private final int unassignedShards; + private final Map partitionStats = new HashMap<>(); + + private AllocationStatsBuilder(int unassignedShards) { + this.unassignedShards = unassignedShards; + } + + public DesiredBalanceMetrics.AllocationStats create() { + return new DesiredBalanceMetrics.AllocationStats( + unassignedShards, + Collections.unmodifiableMap(Maps.transformValues(partitionStats, PartitionStats::create)) + ); + } + + public void incrementTotalAllocations(String partition) { + partitionStats.computeIfAbsent(partition, p -> new PartitionStats()).totalAllocations++; + } + + public void incrementUndesiredAllocationsExcludingShuttingDownNodes(String partition) { + partitionStats.computeIfAbsent(partition, p -> new PartitionStats()).undesiredAllocationsExcludingShuttingDownNodes++; + } + + private static class PartitionStats { + long totalAllocations; + long undesiredAllocationsExcludingShuttingDownNodes; + + public DesiredBalanceMetrics.PartitionStats create() { + return new DesiredBalanceMetrics.PartitionStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes); + } + } + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 515da761d8696..477975caecb58 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -116,7 +116,8 @@ public DesiredBalanceShardsAllocator( ClusterService clusterService, DesiredBalanceReconcilerAction reconciler, TelemetryProvider telemetryProvider, - NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator + NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator, + BalancingWeightsFactory balancingWeightsFactory ) { this( delegateAllocator, @@ -125,7 +126,8 @@ public DesiredBalanceShardsAllocator( new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator), reconciler, telemetryProvider, - nodeAllocationStatsAndWeightsCalculator + nodeAllocationStatsAndWeightsCalculator, + balancingWeightsFactory ); } @@ -136,7 +138,8 @@ public DesiredBalanceShardsAllocator( DesiredBalanceComputer desiredBalanceComputer, DesiredBalanceReconcilerAction reconciler, TelemetryProvider telemetryProvider, - NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator + NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator, + BalancingWeightsFactory balancingWeightsFactory ) { this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry()); this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; @@ -145,7 +148,11 @@ public DesiredBalanceShardsAllocator( this.threadPool = threadPool; this.reconciler = reconciler; this.desiredBalanceComputer = desiredBalanceComputer; - this.desiredBalanceReconciler = new DesiredBalanceReconciler(clusterService.getClusterSettings(), threadPool); + this.desiredBalanceReconciler = new DesiredBalanceReconciler( + clusterService.getClusterSettings(), + threadPool, + balancingWeightsFactory + ); this.desiredBalanceComputation = new ContinuousComputation<>(threadPool.generic()) { @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java index eecc549c1ece4..bf29b54a67f5e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java @@ -17,6 +17,7 @@ public class GlobalBalancingWeightsFactory implements BalancingWeightsFactory { + public static final String GLOBAL_PARTITION_NAME = "global"; private final BalancerSettings balancerSettings; public GlobalBalancingWeightsFactory(BalancerSettings balancerSettings) { @@ -28,6 +29,11 @@ public BalancingWeights create() { return new GlobalBalancingWeights(); } + @Override + public String partitionForShard(ShardRouting shardRouting) { + return GLOBAL_PARTITION_NAME; + } + private class GlobalBalancingWeights implements BalancingWeights { private final WeightFunction weightFunction; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java index 999845fa73610..e00ae8e2a6055 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java @@ -25,10 +25,12 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceInput; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.service.ClusterService; @@ -121,7 +123,8 @@ public DesiredBalance compute( computer, (state, action) -> state, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) ); var allocationService = new MockAllocationService( randomAllocationDeciders(settings, clusterSettings), diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 4ce195721b228..eeb2691356866 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -176,7 +176,8 @@ public void testUndesiredShardCount() { clusterService, (innerState, strategy) -> innerState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) ) { @Override public DesiredBalance getDesiredBalance() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 3667de9c65e4e..75e6ef11f7cac 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -797,7 +797,7 @@ private void addIndex( * A {@link BalancingWeightsFactory} that assumes the cluster is partitioned by the prefix * of the node and shard names before the `-`. */ - class PrefixBalancingWeightsFactory implements BalancingWeightsFactory { + static class PrefixBalancingWeightsFactory implements BalancingWeightsFactory { private final Map prefixWeights; @@ -810,6 +810,11 @@ public BalancingWeights create() { return new PrefixBalancingWeights(); } + @Override + public String partitionForShard(ShardRouting shardRouting) { + return prefix(shardRouting.getIndexName()); + } + class PrefixBalancingWeights implements BalancingWeights { @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java index 6ef622948f5c5..959d858b97f3f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java @@ -490,7 +490,8 @@ private Map.Entry createNewAllocationSer (clusterState, routingAllocationAction) -> strategyRef.get() .executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", routingAllocationAction), TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) ) { @Override public void allocate(RoutingAllocation allocation, ActionListener listener) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 1f8d59a958bfe..601bfdde9d836 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -112,6 +112,9 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase { + private static final GlobalBalancingWeightsFactory BALANCING_WEIGHTS_FACTORY = new GlobalBalancingWeightsFactory( + BalancerSettings.DEFAULT + ); private static AtomicReference ALLOCATION_STATS_PLACEHOLDER = new AtomicReference<>(); public void testNoChangesOnEmptyDesiredBalance() { @@ -1276,7 +1279,11 @@ public void testRebalanceDoesNotCauseHotSpots() { new ConcurrentRebalanceAllocationDecider(clusterSettings), new ThrottlingAllocationDecider(clusterSettings) }; - var reconciler = new DesiredBalanceReconciler(clusterSettings, new DeterministicTaskQueue().getThreadPool()); + var reconciler = new DesiredBalanceReconciler( + clusterSettings, + new DeterministicTaskQueue().getThreadPool(), + BALANCING_WEIGHTS_FACTORY + ); var totalOutgoingMoves = new HashMap(); for (int i = 0; i < numberOfNodes; i++) { @@ -1362,7 +1369,7 @@ public void testShouldLogOnTooManyUndesiredAllocations() { final var timeInMillisSupplier = new AtomicLong(); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(timeInMillisSupplier::incrementAndGet); - var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool); + var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool, BALANCING_WEIGHTS_FACTORY); final long initialDelayInMillis = TimeValue.timeValueMinutes(5).getMillis(); timeInMillisSupplier.addAndGet(randomLongBetween(initialDelayInMillis, 2 * initialDelayInMillis)); @@ -1423,7 +1430,10 @@ private static void reconcile( final var threadPool = mock(ThreadPool.class); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(new AtomicLong()::incrementAndGet); allocationStatsAtomicReference.set( - new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation) + new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool, BALANCING_WEIGHTS_FACTORY).reconcile( + desiredBalance, + routingAllocation + ) ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java index 21d547c1593b8..bec3cf6b8dce4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java @@ -89,6 +89,9 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase { private static final String LOCAL_NODE_ID = "node-1"; private static final String OTHER_NODE_ID = "node-2"; + private static final GlobalBalancingWeightsFactory BALANCING_WEIGHTS_FACTORY = new GlobalBalancingWeightsFactory( + BalancerSettings.DEFAULT + ); public void testGatewayAllocatorPreemptsAllocation() { final var nodeId = randomFrom(LOCAL_NODE_ID, OTHER_NODE_ID); @@ -174,7 +177,8 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo clusterService, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + BALANCING_WEIGHTS_FACTORY ); assertValidStats(desiredBalanceShardsAllocator.getStats()); var allocationService = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator(allocateUnassigned)); @@ -302,7 +306,8 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo clusterService, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + BALANCING_WEIGHTS_FACTORY ); var allocationService = new AllocationService( new AllocationDeciders(List.of()), @@ -421,7 +426,8 @@ boolean hasEnoughIterations(int currentIteration) { }, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + BALANCING_WEIGHTS_FACTORY ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); allocationServiceRef.set(allocationService); @@ -549,7 +555,8 @@ public DesiredBalance compute( }, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + BALANCING_WEIGHTS_FACTORY ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); allocationServiceRef.set(allocationService); @@ -653,7 +660,8 @@ public DesiredBalance compute( }, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + BALANCING_WEIGHTS_FACTORY ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); @@ -746,7 +754,8 @@ public DesiredBalance compute( desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + BALANCING_WEIGHTS_FACTORY ); var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator()); @@ -800,7 +809,8 @@ public void testResetDesiredBalanceOnNoLongerMaster() { desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + BALANCING_WEIGHTS_FACTORY ); var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator()); @@ -850,7 +860,8 @@ public void testResetDesiredBalanceOnNodeShutdown() { desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + BALANCING_WEIGHTS_FACTORY ) { @Override public void resetDesiredBalance() { @@ -946,7 +957,8 @@ public DesiredBalance compute( }, (clusterState, rerouteStrategy) -> null, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + BALANCING_WEIGHTS_FACTORY ) { private ActionListener lastListener; diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index a2d81e0203f15..46946fb0a6b3f 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -176,7 +176,8 @@ private static DesiredBalanceShardsAllocator createDesiredBalanceShardsAllocator clusterService, null, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS + EMPTY_NODE_ALLOCATION_STATS, + new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) ) { private RoutingAllocation lastAllocation; From 24c2887f0927c34b2a0f5b3c882b99f99942a55e Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 1 Aug 2025 14:55:25 +1000 Subject: [PATCH 02/11] Break down by shard routing role instead of tier --- .../allocator/BalancingWeightsFactory.java | 9 --- .../allocator/DesiredBalanceMetrics.java | 33 ++++------ .../allocator/DesiredBalanceReconciler.java | 64 +++++-------------- .../GlobalBalancingWeightsFactory.java | 6 -- .../BalancedShardsAllocatorTests.java | 5 -- .../allocator/DesiredBalanceMetricsTests.java | 7 +- .../DesiredBalanceReconcilerTests.java | 24 +++---- 7 files changed, 44 insertions(+), 104 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java index 7748af7060dd4..23c556fe9d9d1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java @@ -9,8 +9,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator; -import org.elasticsearch.cluster.routing.ShardRouting; - /** * A balancing weights factory must be able to divide all shards and nodes into mutually * disjoint partitions. Allocation balancing will then be conducted sequentially for each partition. @@ -20,11 +18,4 @@ public interface BalancingWeightsFactory { BalancingWeights create(); - - /** - * Get partition for shard - * - * @return A string identifying which partition the shard belongs to - */ - String partitionForShard(ShardRouting shardRouting); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index 47312254a88f0..3639d0ea8ba6f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -10,6 +10,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; @@ -31,30 +32,18 @@ public class DesiredBalanceMetrics { /** * @param unassignedShards Shards that are not assigned to any node. - * @param partitionStats Allocation stats broken down by balancer partition - */ - public record AllocationStats(long unassignedShards, Map partitionStats) { - - public AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) { - this(unassignedShards, Map.of("global", new PartitionStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes))); - } - - public long totalAllocations() { - return partitionStats.values().stream().mapToLong(PartitionStats::totalAllocations).sum(); - } - - public long undesiredAllocationsExcludingShuttingDownNodes() { - return partitionStats.values().stream().mapToLong(PartitionStats::undesiredAllocationsExcludingShuttingDownNodes).sum(); - } - } - - /** * @param totalAllocations Shards that are assigned to a node. * @param undesiredAllocationsExcludingShuttingDownNodes Shards that are assigned to a node but must move to alleviate a resource * constraint per the {@link AllocationDeciders}. Excludes shards that must move * because of a node shutting down. + * @param undesiredAllocationsByRole A breakdown of the undesired allocations by {@link ShardRouting.Role} */ - public record PartitionStats(long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {} + public record AllocationStats( + long unassignedShards, + long totalAllocations, + long undesiredAllocationsExcludingShuttingDownNodes, + Map undesiredAllocationsByRole + ) {} public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) { public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0); @@ -89,7 +78,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME = "es.allocator.allocations.node.forecasted_disk_usage_bytes.current"; - public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, Map.of()); + public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1, Map.of()); private volatile boolean nodeIsMaster = false; @@ -123,8 +112,8 @@ public void updateMetrics( assert weightStatsPerNode != null : "node balance weight stats cannot be null"; if (allocationStats != EMPTY_ALLOCATION_STATS) { this.unassignedShards = allocationStats.unassignedShards; - this.totalAllocations = allocationStats.totalAllocations(); - this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes(); + this.totalAllocations = allocationStats.totalAllocations; + this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes; } weightStatsPerNodeRef.set(weightStatsPerNode); allocationStatsPerNodeRef.set(nodeAllocationStats); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 511f2deb57a4b..c70bfc2d1f151 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -9,6 +9,9 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import com.carrotsearch.hppc.ObjectLongHashMap; +import com.carrotsearch.hppc.ObjectLongMap; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.ArrayUtil; @@ -27,7 +30,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.gateway.PriorityComparator; @@ -35,11 +37,9 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collections; +import java.util.Arrays; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -533,16 +533,17 @@ private void moveShards() { private DesiredBalanceMetrics.AllocationStats balance() { int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size(); - final AllocationStatsBuilder allocationStatsBuilder = new AllocationStatsBuilder(unassignedShards); + int totalAllocations = 0; + int undesiredAllocationsExcludingShuttingDownNodes = 0; + final ObjectLongMap undesiredAllocationsExcludingShuttingDownNodesByRole = new ObjectLongHashMap<>(); // Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard // movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the // shards. for (final var iterator = OrderedShardsIterator.createForBalancing(allocation, moveOrdering); iterator.hasNext();) { final var shardRouting = iterator.next(); - final String shardPartition = balancingWeightsFactory.partitionForShard(shardRouting); - allocationStatsBuilder.incrementTotalAllocations(shardPartition); + totalAllocations++; if (shardRouting.started() == false) { // can only rebalance started shards @@ -562,7 +563,8 @@ private DesiredBalanceMetrics.AllocationStats balance() { if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) { // shard is not on a shutting down node, nor is it on a desired node per the previous check. - allocationStatsBuilder.incrementUndesiredAllocationsExcludingShuttingDownNodes(shardPartition); + undesiredAllocationsExcludingShuttingDownNodes++; + undesiredAllocationsExcludingShuttingDownNodesByRole.addTo(shardRouting.role(), 1); } if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { @@ -601,13 +603,14 @@ private DesiredBalanceMetrics.AllocationStats balance() { } } - final DesiredBalanceMetrics.AllocationStats allocationStats = allocationStatsBuilder.create(); - maybeLogUndesiredAllocationsWarning( - allocationStats.totalAllocations(), - allocationStats.undesiredAllocationsExcludingShuttingDownNodes(), - routingNodes.size() + maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size()); + return new DesiredBalanceMetrics.AllocationStats( + unassignedShards, + totalAllocations, + undesiredAllocationsExcludingShuttingDownNodes, + Arrays.stream(ShardRouting.Role.values()) + .collect(Collectors.toUnmodifiableMap(role -> role, undesiredAllocationsExcludingShuttingDownNodesByRole::get)) ); - return allocationStats; } private void maybeLogUndesiredAllocationsWarning(long totalAllocations, long undesiredAllocations, int nodeCount) { @@ -673,38 +676,5 @@ private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, Rout assert target != null : "Target node is not found"; return allocation.deciders().canForceAllocateDuringReplace(shardRouting, target, allocation); } - - private static class AllocationStatsBuilder { - private final int unassignedShards; - private final Map partitionStats = new HashMap<>(); - - private AllocationStatsBuilder(int unassignedShards) { - this.unassignedShards = unassignedShards; - } - - public DesiredBalanceMetrics.AllocationStats create() { - return new DesiredBalanceMetrics.AllocationStats( - unassignedShards, - Collections.unmodifiableMap(Maps.transformValues(partitionStats, PartitionStats::create)) - ); - } - - public void incrementTotalAllocations(String partition) { - partitionStats.computeIfAbsent(partition, p -> new PartitionStats()).totalAllocations++; - } - - public void incrementUndesiredAllocationsExcludingShuttingDownNodes(String partition) { - partitionStats.computeIfAbsent(partition, p -> new PartitionStats()).undesiredAllocationsExcludingShuttingDownNodes++; - } - - private static class PartitionStats { - long totalAllocations; - long undesiredAllocationsExcludingShuttingDownNodes; - - public DesiredBalanceMetrics.PartitionStats create() { - return new DesiredBalanceMetrics.PartitionStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes); - } - } - } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java index bf29b54a67f5e..eecc549c1ece4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java @@ -17,7 +17,6 @@ public class GlobalBalancingWeightsFactory implements BalancingWeightsFactory { - public static final String GLOBAL_PARTITION_NAME = "global"; private final BalancerSettings balancerSettings; public GlobalBalancingWeightsFactory(BalancerSettings balancerSettings) { @@ -29,11 +28,6 @@ public BalancingWeights create() { return new GlobalBalancingWeights(); } - @Override - public String partitionForShard(ShardRouting shardRouting) { - return GLOBAL_PARTITION_NAME; - } - private class GlobalBalancingWeights implements BalancingWeights { private final WeightFunction weightFunction; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 75e6ef11f7cac..15680ab9e43a2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -810,11 +810,6 @@ public BalancingWeights create() { return new PrefixBalancingWeights(); } - @Override - public String partitionForShard(ShardRouting shardRouting) { - return prefix(shardRouting.getIndexName()); - } - class PrefixBalancingWeights implements BalancingWeights { @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java index 9e6e080f38216..48f45dd598d4c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java @@ -27,9 +27,10 @@ public void testZeroAllMetrics() { long unassignedShards = randomNonNegativeLong(); long totalAllocations = randomNonNegativeLong(); long undesiredAllocations = randomNonNegativeLong(); - metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); + metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations, Map.of()), Map.of(), Map.of()); assertEquals(totalAllocations, metrics.totalAllocations()); assertEquals(unassignedShards, metrics.unassignedShards()); + assertEquals(undesiredAllocations, metrics.undesiredAllocations()); metrics.zeroAllMetrics(); assertEquals(0, metrics.totalAllocations()); @@ -44,7 +45,7 @@ public void testMetricsAreOnlyPublishedWhenNodeIsMaster() { long unassignedShards = randomNonNegativeLong(); long totalAllocations = randomLongBetween(100, 10000000); long undesiredAllocations = randomLongBetween(0, totalAllocations); - metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); + metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations, Map.of()), Map.of(), Map.of()); // Collect when not master meterRegistry.getRecorder().collect(); @@ -104,7 +105,7 @@ public void testUndesiredAllocationRatioIsZeroWhenTotalShardsIsZero() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(meterRegistry); long unassignedShards = randomNonNegativeLong(); - metrics.updateMetrics(new AllocationStats(unassignedShards, 0, 0), Map.of(), Map.of()); + metrics.updateMetrics(new AllocationStats(unassignedShards, 0, 0, Map.of()), Map.of(), Map.of()); metrics.setNodeIsMaster(true); meterRegistry.getRecorder().collect(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 601bfdde9d836..5c133440f2594 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -284,7 +284,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing assertTrue(index1RoutingTable.primaryShard().unassigned()); assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned)); assertNotNull(allocationStats.get()); - assertEquals(new DesiredBalanceMetrics.AllocationStats(3, 1, 0), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(3, 1, 0, Map.of()), allocationStats.get()); } // now relax the filter so that the replica of index-0 and the primary of index-1 can both be assigned to node-1, but the throttle @@ -299,7 +299,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing assertTrue(index1RoutingTable.primaryShard().initializing()); assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned)); assertNotNull(allocationStats.get()); - assertEquals(new DesiredBalanceMetrics.AllocationStats(2, 2, 0), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(2, 2, 0, Map.of()), allocationStats.get()); } final var stateWithStartedPrimariesAndInitializingReplica = startInitializingShardsAndReroute( @@ -316,7 +316,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing assertTrue(index1RoutingTable.primaryShard().started()); assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned)); assertNotNull(allocationStats.get()); - assertEquals(new DesiredBalanceMetrics.AllocationStats(1, 3, 0), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(1, 3, 0, Map.of()), allocationStats.get()); } } @@ -913,7 +913,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertThat(shardRouting.currentNodeId(), oneOf("node-0", "node-1")); } assertNotNull(allocationStats); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get()); // Only allow allocation on two of the nodes, excluding the other two nodes. clusterSettings.applySettings( @@ -929,7 +929,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // all still on desired nodes, no // movement needed assertNotNull(allocationStats); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get()); desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3"))); @@ -940,12 +940,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); assertNotNull(allocationStats); // Total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice. - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4, Map.of(ShardRouting.Role.DEFAULT, 4L)), allocationStats.get()); // Ensuring that we check the shortcut two-param canAllocate() method up front canAllocateRef.set(Decision.NO); assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6, Map.of(ShardRouting.Role.DEFAULT, 6L)), allocationStats.get()); canAllocateRef.set(Decision.YES); // Restore filter to default @@ -983,7 +983,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat "test", ActionListener.noop() ); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 7, 3), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 7, 3, Map.of(ShardRouting.Role.DEFAULT, 3L)), allocationStats.get()); assertThat(shuttingDownState.getRoutingNodes().node("node-2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); } @@ -1051,7 +1051,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat // All still on desired nodes, no movement needed, cluster state remains the same. assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get()); desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3"))); @@ -1079,7 +1079,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertThat(reroutedState.getRoutingNodes().node("node-0").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); assertNotNull(allocationStats.get()); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6, Map.of(ShardRouting.Role.DEFAULT, 6L)), allocationStats.get()); // Test that the AllocationStats are still updated, even though throttling is active. The cluster state should remain unchanged // because due to throttling: the previous reroute request started relocating two shards and, since those reallocations have not @@ -1087,7 +1087,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertSame(reroutedState, allocationService.reroute(reroutedState, "test", ActionListener.noop())); assertNotNull(allocationStats); // Note: total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice. - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4, Map.of(ShardRouting.Role.DEFAULT, 4L)), allocationStats.get()); } public void testDoNotRebalanceToTheNodeThatNoLongerExists() { @@ -1300,7 +1300,7 @@ public void testRebalanceDoesNotCauseHotSpots() { var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING); if (initializing.isEmpty()) { - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, shardsPerNode * numberOfNodes, 0), allocationStats); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, shardsPerNode * numberOfNodes, 0, Map.of()), allocationStats); break; } From 47c4516735c89a524d81a74762479a54e68e0d07 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 1 Aug 2025 15:03:09 +1000 Subject: [PATCH 03/11] Remove remaining cruft --- .../elasticsearch/cluster/ClusterModule.java | 3 +- .../allocator/DesiredBalanceReconciler.java | 10 ++----- .../DesiredBalanceShardsAllocator.java | 15 +++------- ...nsportDeleteDesiredBalanceActionTests.java | 5 +--- .../AllocationStatsServiceTests.java | 3 +- .../BalancedShardsAllocatorTests.java | 2 +- .../ClusterAllocationSimulationTests.java | 3 +- .../allocator/DesiredBalanceMetricsTests.java | 1 - .../DesiredBalanceReconcilerTests.java | 16 ++-------- .../DesiredBalanceShardsAllocatorTests.java | 30 ++++++------------- .../cluster/ESAllocationTestCase.java | 3 +- 11 files changed, 24 insertions(+), 67 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index eacb6a44da780..5633bd8b89e1e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -496,8 +496,7 @@ private static ShardsAllocator createShardsAllocator( clusterService, reconciler, telemetryProvider, - nodeAllocationStatsAndWeightsCalculator, - balancingWeightsFactory + nodeAllocationStatsAndWeightsCalculator ) ); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index c70bfc2d1f151..6adeeb779c60e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -83,18 +83,12 @@ public class DesiredBalanceReconciler { private double undesiredAllocationsLogThreshold; private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering(); private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering(); - private final BalancingWeightsFactory balancingWeightsFactory; - public DesiredBalanceReconciler( - ClusterSettings clusterSettings, - ThreadPool threadPool, - BalancingWeightsFactory balancingWeightsFactory - ) { + public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) { this.undesiredAllocationLogInterval = new FrequencyCappedAction( threadPool.relativeTimeInMillisSupplier(), TimeValue.timeValueMinutes(5) ); - this.balancingWeightsFactory = balancingWeightsFactory; clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, this.undesiredAllocationLogInterval::setMinInterval); clusterSettings.initializeAndWatch( UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING, @@ -613,7 +607,7 @@ private DesiredBalanceMetrics.AllocationStats balance() { ); } - private void maybeLogUndesiredAllocationsWarning(long totalAllocations, long undesiredAllocations, int nodeCount) { + private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) { // more shards than cluster can relocate with one reroute final boolean nonEmptyRelocationBacklog = undesiredAllocations > 2L * nodeCount; final boolean warningThresholdReached = undesiredAllocations > undesiredAllocationsLogThreshold * totalAllocations; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 477975caecb58..515da761d8696 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -116,8 +116,7 @@ public DesiredBalanceShardsAllocator( ClusterService clusterService, DesiredBalanceReconcilerAction reconciler, TelemetryProvider telemetryProvider, - NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator, - BalancingWeightsFactory balancingWeightsFactory + NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator ) { this( delegateAllocator, @@ -126,8 +125,7 @@ public DesiredBalanceShardsAllocator( new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator), reconciler, telemetryProvider, - nodeAllocationStatsAndWeightsCalculator, - balancingWeightsFactory + nodeAllocationStatsAndWeightsCalculator ); } @@ -138,8 +136,7 @@ public DesiredBalanceShardsAllocator( DesiredBalanceComputer desiredBalanceComputer, DesiredBalanceReconcilerAction reconciler, TelemetryProvider telemetryProvider, - NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator, - BalancingWeightsFactory balancingWeightsFactory + NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator ) { this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry()); this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; @@ -148,11 +145,7 @@ public DesiredBalanceShardsAllocator( this.threadPool = threadPool; this.reconciler = reconciler; this.desiredBalanceComputer = desiredBalanceComputer; - this.desiredBalanceReconciler = new DesiredBalanceReconciler( - clusterService.getClusterSettings(), - threadPool, - balancingWeightsFactory - ); + this.desiredBalanceReconciler = new DesiredBalanceReconciler(clusterService.getClusterSettings(), threadPool); this.desiredBalanceComputation = new ContinuousComputation<>(threadPool.generic()) { @Override diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java index e00ae8e2a6055..999845fa73610 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java @@ -25,12 +25,10 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; -import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceInput; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; -import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.service.ClusterService; @@ -123,8 +121,7 @@ public DesiredBalance compute( computer, (state, action) -> state, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) + EMPTY_NODE_ALLOCATION_STATS ); var allocationService = new MockAllocationService( randomAllocationDeciders(settings, clusterSettings), diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index eeb2691356866..4ce195721b228 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -176,8 +176,7 @@ public void testUndesiredShardCount() { clusterService, (innerState, strategy) -> innerState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) + EMPTY_NODE_ALLOCATION_STATS ) { @Override public DesiredBalance getDesiredBalance() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 15680ab9e43a2..3667de9c65e4e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -797,7 +797,7 @@ private void addIndex( * A {@link BalancingWeightsFactory} that assumes the cluster is partitioned by the prefix * of the node and shard names before the `-`. */ - static class PrefixBalancingWeightsFactory implements BalancingWeightsFactory { + class PrefixBalancingWeightsFactory implements BalancingWeightsFactory { private final Map prefixWeights; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java index 959d858b97f3f..6ef622948f5c5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java @@ -490,8 +490,7 @@ private Map.Entry createNewAllocationSer (clusterState, routingAllocationAction) -> strategyRef.get() .executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", routingAllocationAction), TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) + EMPTY_NODE_ALLOCATION_STATS ) { @Override public void allocate(RoutingAllocation allocation, ActionListener listener) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java index 48f45dd598d4c..28a3261b9a0c6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java @@ -30,7 +30,6 @@ public void testZeroAllMetrics() { metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations, Map.of()), Map.of(), Map.of()); assertEquals(totalAllocations, metrics.totalAllocations()); assertEquals(unassignedShards, metrics.unassignedShards()); - assertEquals(undesiredAllocations, metrics.undesiredAllocations()); metrics.zeroAllMetrics(); assertEquals(0, metrics.totalAllocations()); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 5c133440f2594..35d52410c8e8a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -112,9 +112,6 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase { - private static final GlobalBalancingWeightsFactory BALANCING_WEIGHTS_FACTORY = new GlobalBalancingWeightsFactory( - BalancerSettings.DEFAULT - ); private static AtomicReference ALLOCATION_STATS_PLACEHOLDER = new AtomicReference<>(); public void testNoChangesOnEmptyDesiredBalance() { @@ -1279,11 +1276,7 @@ public void testRebalanceDoesNotCauseHotSpots() { new ConcurrentRebalanceAllocationDecider(clusterSettings), new ThrottlingAllocationDecider(clusterSettings) }; - var reconciler = new DesiredBalanceReconciler( - clusterSettings, - new DeterministicTaskQueue().getThreadPool(), - BALANCING_WEIGHTS_FACTORY - ); + var reconciler = new DesiredBalanceReconciler(clusterSettings, new DeterministicTaskQueue().getThreadPool()); var totalOutgoingMoves = new HashMap(); for (int i = 0; i < numberOfNodes; i++) { @@ -1369,7 +1362,7 @@ public void testShouldLogOnTooManyUndesiredAllocations() { final var timeInMillisSupplier = new AtomicLong(); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(timeInMillisSupplier::incrementAndGet); - var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool, BALANCING_WEIGHTS_FACTORY); + var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool); final long initialDelayInMillis = TimeValue.timeValueMinutes(5).getMillis(); timeInMillisSupplier.addAndGet(randomLongBetween(initialDelayInMillis, 2 * initialDelayInMillis)); @@ -1430,10 +1423,7 @@ private static void reconcile( final var threadPool = mock(ThreadPool.class); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(new AtomicLong()::incrementAndGet); allocationStatsAtomicReference.set( - new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool, BALANCING_WEIGHTS_FACTORY).reconcile( - desiredBalance, - routingAllocation - ) + new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation) ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java index bec3cf6b8dce4..21d547c1593b8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java @@ -89,9 +89,6 @@ public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase { private static final String LOCAL_NODE_ID = "node-1"; private static final String OTHER_NODE_ID = "node-2"; - private static final GlobalBalancingWeightsFactory BALANCING_WEIGHTS_FACTORY = new GlobalBalancingWeightsFactory( - BalancerSettings.DEFAULT - ); public void testGatewayAllocatorPreemptsAllocation() { final var nodeId = randomFrom(LOCAL_NODE_ID, OTHER_NODE_ID); @@ -177,8 +174,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo clusterService, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - BALANCING_WEIGHTS_FACTORY + EMPTY_NODE_ALLOCATION_STATS ); assertValidStats(desiredBalanceShardsAllocator.getStats()); var allocationService = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator(allocateUnassigned)); @@ -306,8 +302,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo clusterService, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - BALANCING_WEIGHTS_FACTORY + EMPTY_NODE_ALLOCATION_STATS ); var allocationService = new AllocationService( new AllocationDeciders(List.of()), @@ -426,8 +421,7 @@ boolean hasEnoughIterations(int currentIteration) { }, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - BALANCING_WEIGHTS_FACTORY + EMPTY_NODE_ALLOCATION_STATS ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); allocationServiceRef.set(allocationService); @@ -555,8 +549,7 @@ public DesiredBalance compute( }, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - BALANCING_WEIGHTS_FACTORY + EMPTY_NODE_ALLOCATION_STATS ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); allocationServiceRef.set(allocationService); @@ -660,8 +653,7 @@ public DesiredBalance compute( }, reconcileAction, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - BALANCING_WEIGHTS_FACTORY + EMPTY_NODE_ALLOCATION_STATS ); var allocationService = createAllocationService(desiredBalanceShardsAllocator, gatewayAllocator); @@ -754,8 +746,7 @@ public DesiredBalance compute( desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - BALANCING_WEIGHTS_FACTORY + EMPTY_NODE_ALLOCATION_STATS ); var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator()); @@ -809,8 +800,7 @@ public void testResetDesiredBalanceOnNoLongerMaster() { desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - BALANCING_WEIGHTS_FACTORY + EMPTY_NODE_ALLOCATION_STATS ); var service = createAllocationService(desiredBalanceShardsAllocator, createGatewayAllocator()); @@ -860,8 +850,7 @@ public void testResetDesiredBalanceOnNodeShutdown() { desiredBalanceComputer, (reconcilerClusterState, rerouteStrategy) -> reconcilerClusterState, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - BALANCING_WEIGHTS_FACTORY + EMPTY_NODE_ALLOCATION_STATS ) { @Override public void resetDesiredBalance() { @@ -957,8 +946,7 @@ public DesiredBalance compute( }, (clusterState, rerouteStrategy) -> null, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - BALANCING_WEIGHTS_FACTORY + EMPTY_NODE_ALLOCATION_STATS ) { private ActionListener lastListener; diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 46946fb0a6b3f..a2d81e0203f15 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -176,8 +176,7 @@ private static DesiredBalanceShardsAllocator createDesiredBalanceShardsAllocator clusterService, null, TelemetryProvider.NOOP, - EMPTY_NODE_ALLOCATION_STATS, - new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) + EMPTY_NODE_ALLOCATION_STATS ) { private RoutingAllocation lastAllocation; From 5ba30bbc0a7e764d60317d4b0caf1aee8e20c4d3 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 1 Aug 2025 15:14:35 +1000 Subject: [PATCH 04/11] Naming --- .../routing/allocation/allocator/DesiredBalanceMetrics.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index 3639d0ea8ba6f..9cd3f36874236 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -36,13 +36,13 @@ public class DesiredBalanceMetrics { * @param undesiredAllocationsExcludingShuttingDownNodes Shards that are assigned to a node but must move to alleviate a resource * constraint per the {@link AllocationDeciders}. Excludes shards that must move * because of a node shutting down. - * @param undesiredAllocationsByRole A breakdown of the undesired allocations by {@link ShardRouting.Role} + * @param undesiredAllocationsExcludingShuttingDownNodesByRole A breakdown of the undesired allocations by {@link ShardRouting.Role} */ public record AllocationStats( long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes, - Map undesiredAllocationsByRole + Map undesiredAllocationsExcludingShuttingDownNodesByRole ) {} public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) { From 5fb7f21fc3637a0e3ccbd859700df1e7ca38023a Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 1 Aug 2025 15:24:46 +1000 Subject: [PATCH 05/11] Only output roles that have undesired allocations --- .../allocation/allocator/DesiredBalanceReconciler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 6adeeb779c60e..a3c658ffce0d7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -37,13 +37,13 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE; import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize; @@ -602,8 +602,8 @@ private DesiredBalanceMetrics.AllocationStats balance() { unassignedShards, totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, - Arrays.stream(ShardRouting.Role.values()) - .collect(Collectors.toUnmodifiableMap(role -> role, undesiredAllocationsExcludingShuttingDownNodesByRole::get)) + StreamSupport.stream(undesiredAllocationsExcludingShuttingDownNodesByRole.spliterator(), false) + .collect(Collectors.toUnmodifiableMap(lc -> lc.key, lc -> lc.value)) ); } From bef60eca53cfbfc5f6b56035ef5925696ae88c61 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 11 Aug 2025 16:19:41 +1000 Subject: [PATCH 06/11] Break down allocation stats by tier for IngestMetricsService --- .../allocator/DesiredBalanceMetrics.java | 58 +++++++++++++++---- .../allocator/DesiredBalanceReconciler.java | 16 +++-- .../DesiredBalanceShardsAllocator.java | 4 ++ .../allocator/DesiredBalanceMetricsTests.java | 6 +- .../DesiredBalanceReconcilerTests.java | 24 ++++---- 5 files changed, 79 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index 9cd3f36874236..df76e21dec743 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -32,18 +32,45 @@ public class DesiredBalanceMetrics { /** * @param unassignedShards Shards that are not assigned to any node. + * @param allocationStatsByRole A breakdown of the allocations stats by {@link ShardRouting.Role} + */ + public record AllocationStats(long unassignedShards, Map allocationStatsByRole) { + + public AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) { + this( + unassignedShards, + Map.of(ShardRouting.Role.DEFAULT, new RoleAllocationStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes)) + ); + } + + public long totalAllocations() { + return allocationStatsByRole.values().stream().mapToLong(RoleAllocationStats::totalAllocations).sum(); + } + + public long undesiredAllocationsExcludingShuttingDownNodes() { + return allocationStatsByRole.values() + .stream() + .mapToLong(RoleAllocationStats::undesiredAllocationsExcludingShuttingDownNodes) + .sum(); + } + } + + /** * @param totalAllocations Shards that are assigned to a node. * @param undesiredAllocationsExcludingShuttingDownNodes Shards that are assigned to a node but must move to alleviate a resource * constraint per the {@link AllocationDeciders}. Excludes shards that must move * because of a node shutting down. - * @param undesiredAllocationsExcludingShuttingDownNodesByRole A breakdown of the undesired allocations by {@link ShardRouting.Role} */ - public record AllocationStats( - long unassignedShards, - long totalAllocations, - long undesiredAllocationsExcludingShuttingDownNodes, - Map undesiredAllocationsExcludingShuttingDownNodesByRole - ) {} + public record RoleAllocationStats(long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) { + public static final RoleAllocationStats EMPTY = new RoleAllocationStats(0L, 0L); + + public float undesiredAllocationsRatio() { + if (totalAllocations == 0) { + return 0f; + } + return undesiredAllocationsExcludingShuttingDownNodes / (float) totalAllocations; + } + } public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) { public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0); @@ -78,7 +105,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME = "es.allocator.allocations.node.forecasted_disk_usage_bytes.current"; - public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1, Map.of()); + public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, Map.of()); private volatile boolean nodeIsMaster = false; @@ -98,6 +125,12 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w */ private volatile long undesiredAllocationsExcludingShuttingDownNodes; + /** + * A breakdown of shards assigned and the undesired allocations from the last reconciliation + * broken down by {@link ShardRouting.Role}. + */ + private volatile Map allocationStatsByRole; + private final AtomicReference> weightStatsPerNodeRef = new AtomicReference<>(Map.of()); private final AtomicReference> allocationStatsPerNodeRef = new AtomicReference<>( Map.of() @@ -112,8 +145,9 @@ public void updateMetrics( assert weightStatsPerNode != null : "node balance weight stats cannot be null"; if (allocationStats != EMPTY_ALLOCATION_STATS) { this.unassignedShards = allocationStats.unassignedShards; - this.totalAllocations = allocationStats.totalAllocations; - this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes; + this.totalAllocations = allocationStats.totalAllocations(); + this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes(); + this.allocationStatsByRole = allocationStats.allocationStatsByRole(); } weightStatsPerNodeRef.set(weightStatsPerNode); allocationStatsPerNodeRef.set(nodeAllocationStats); @@ -223,6 +257,10 @@ public long undesiredAllocations() { return undesiredAllocationsExcludingShuttingDownNodes; } + public Map allocationStatsByRole() { + return allocationStatsByRole; + } + private List getUnassignedShardsMetrics() { return getIfPublishing(unassignedShards); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index a3c658ffce0d7..c55e2a23ab8fa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -529,6 +529,7 @@ private DesiredBalanceMetrics.AllocationStats balance() { int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size(); int totalAllocations = 0; int undesiredAllocationsExcludingShuttingDownNodes = 0; + final ObjectLongMap totalAllocationsByRole = new ObjectLongHashMap<>(); final ObjectLongMap undesiredAllocationsExcludingShuttingDownNodesByRole = new ObjectLongHashMap<>(); // Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard @@ -538,6 +539,7 @@ private DesiredBalanceMetrics.AllocationStats balance() { final var shardRouting = iterator.next(); totalAllocations++; + totalAllocationsByRole.addTo(shardRouting.role(), 1); if (shardRouting.started() == false) { // can only rebalance started shards @@ -600,10 +602,16 @@ private DesiredBalanceMetrics.AllocationStats balance() { maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size()); return new DesiredBalanceMetrics.AllocationStats( unassignedShards, - totalAllocations, - undesiredAllocationsExcludingShuttingDownNodes, - StreamSupport.stream(undesiredAllocationsExcludingShuttingDownNodesByRole.spliterator(), false) - .collect(Collectors.toUnmodifiableMap(lc -> lc.key, lc -> lc.value)) + StreamSupport.stream(totalAllocationsByRole.spliterator(), false) + .collect( + Collectors.toUnmodifiableMap( + lc -> lc.key, + lc -> new DesiredBalanceMetrics.RoleAllocationStats( + totalAllocationsByRole.get(lc.key), + undesiredAllocationsExcludingShuttingDownNodesByRole.get(lc.key) + ) + ) + ) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 515da761d8696..70389efc41a74 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -438,6 +438,10 @@ public DesiredBalanceStats getStats() { ); } + public Map getAllocationStatsByRole() { + return desiredBalanceMetrics.allocationStatsByRole(); + } + private void onNoLongerMaster() { if (indexGenerator.getAndSet(-1) != -1) { currentDesiredBalanceRef.set(DesiredBalance.NOT_MASTER); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java index 28a3261b9a0c6..9e6e080f38216 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java @@ -27,7 +27,7 @@ public void testZeroAllMetrics() { long unassignedShards = randomNonNegativeLong(); long totalAllocations = randomNonNegativeLong(); long undesiredAllocations = randomNonNegativeLong(); - metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations, Map.of()), Map.of(), Map.of()); + metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); assertEquals(totalAllocations, metrics.totalAllocations()); assertEquals(unassignedShards, metrics.unassignedShards()); assertEquals(undesiredAllocations, metrics.undesiredAllocations()); @@ -44,7 +44,7 @@ public void testMetricsAreOnlyPublishedWhenNodeIsMaster() { long unassignedShards = randomNonNegativeLong(); long totalAllocations = randomLongBetween(100, 10000000); long undesiredAllocations = randomLongBetween(0, totalAllocations); - metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations, Map.of()), Map.of(), Map.of()); + metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); // Collect when not master meterRegistry.getRecorder().collect(); @@ -104,7 +104,7 @@ public void testUndesiredAllocationRatioIsZeroWhenTotalShardsIsZero() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(meterRegistry); long unassignedShards = randomNonNegativeLong(); - metrics.updateMetrics(new AllocationStats(unassignedShards, 0, 0, Map.of()), Map.of(), Map.of()); + metrics.updateMetrics(new AllocationStats(unassignedShards, 0, 0), Map.of(), Map.of()); metrics.setNodeIsMaster(true); meterRegistry.getRecorder().collect(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 35d52410c8e8a..1f8d59a958bfe 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -281,7 +281,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing assertTrue(index1RoutingTable.primaryShard().unassigned()); assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned)); assertNotNull(allocationStats.get()); - assertEquals(new DesiredBalanceMetrics.AllocationStats(3, 1, 0, Map.of()), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(3, 1, 0), allocationStats.get()); } // now relax the filter so that the replica of index-0 and the primary of index-1 can both be assigned to node-1, but the throttle @@ -296,7 +296,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing assertTrue(index1RoutingTable.primaryShard().initializing()); assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned)); assertNotNull(allocationStats.get()); - assertEquals(new DesiredBalanceMetrics.AllocationStats(2, 2, 0, Map.of()), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(2, 2, 0), allocationStats.get()); } final var stateWithStartedPrimariesAndInitializingReplica = startInitializingShardsAndReroute( @@ -313,7 +313,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing assertTrue(index1RoutingTable.primaryShard().started()); assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned)); assertNotNull(allocationStats.get()); - assertEquals(new DesiredBalanceMetrics.AllocationStats(1, 3, 0, Map.of()), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(1, 3, 0), allocationStats.get()); } } @@ -910,7 +910,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertThat(shardRouting.currentNodeId(), oneOf("node-0", "node-1")); } assertNotNull(allocationStats); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get()); // Only allow allocation on two of the nodes, excluding the other two nodes. clusterSettings.applySettings( @@ -926,7 +926,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // all still on desired nodes, no // movement needed assertNotNull(allocationStats); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get()); desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3"))); @@ -937,12 +937,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); assertNotNull(allocationStats); // Total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice. - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4, Map.of(ShardRouting.Role.DEFAULT, 4L)), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get()); // Ensuring that we check the shortcut two-param canAllocate() method up front canAllocateRef.set(Decision.NO); assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6, Map.of(ShardRouting.Role.DEFAULT, 6L)), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get()); canAllocateRef.set(Decision.YES); // Restore filter to default @@ -980,7 +980,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat "test", ActionListener.noop() ); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 7, 3, Map.of(ShardRouting.Role.DEFAULT, 3L)), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 7, 3), allocationStats.get()); assertThat(shuttingDownState.getRoutingNodes().node("node-2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); } @@ -1048,7 +1048,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat // All still on desired nodes, no movement needed, cluster state remains the same. assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get()); desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3"))); @@ -1076,7 +1076,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertThat(reroutedState.getRoutingNodes().node("node-0").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); assertNotNull(allocationStats.get()); - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6, Map.of(ShardRouting.Role.DEFAULT, 6L)), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get()); // Test that the AllocationStats are still updated, even though throttling is active. The cluster state should remain unchanged // because due to throttling: the previous reroute request started relocating two shards and, since those reallocations have not @@ -1084,7 +1084,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertSame(reroutedState, allocationService.reroute(reroutedState, "test", ActionListener.noop())); assertNotNull(allocationStats); // Note: total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice. - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4, Map.of(ShardRouting.Role.DEFAULT, 4L)), allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get()); } public void testDoNotRebalanceToTheNodeThatNoLongerExists() { @@ -1293,7 +1293,7 @@ public void testRebalanceDoesNotCauseHotSpots() { var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING); if (initializing.isEmpty()) { - assertEquals(new DesiredBalanceMetrics.AllocationStats(0, shardsPerNode * numberOfNodes, 0, Map.of()), allocationStats); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, shardsPerNode * numberOfNodes, 0), allocationStats); break; } From 78ee79ad59fc6ae9ae83f29914f92ecdeb24559f Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 11 Aug 2025 16:47:48 +1000 Subject: [PATCH 07/11] Return undesired allocations ratio as double (to match setting) --- .../allocation/allocator/DesiredBalanceMetrics.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index df76e21dec743..cf626ac7a6a1b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -64,11 +64,16 @@ public long undesiredAllocationsExcludingShuttingDownNodes() { public record RoleAllocationStats(long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) { public static final RoleAllocationStats EMPTY = new RoleAllocationStats(0L, 0L); - public float undesiredAllocationsRatio() { + /** + * Return the ratio of undesired allocations to the total number of allocations. + * + * @return a value in [0.0, 1.0] + */ + public double undesiredAllocationsRatio() { if (totalAllocations == 0) { - return 0f; + return 0.0; } - return undesiredAllocationsExcludingShuttingDownNodes / (float) totalAllocations; + return undesiredAllocationsExcludingShuttingDownNodes / (double) totalAllocations; } } From 89eee2cc09f9706c04281cf8d4c26f48c9ce3557 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 12 Aug 2025 09:28:09 +1000 Subject: [PATCH 08/11] Tidy up DesiredBalanceMetrics --- .../allocator/DesiredBalanceMetrics.java | 76 +++++++++---------- 1 file changed, 34 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index cf626ac7a6a1b..36b1269719800 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.ToLongFunction; /** * Maintains balancer metrics and makes them accessible to the {@link MeterRegistry} and APM reporting. Metrics are updated @@ -53,6 +54,19 @@ public long undesiredAllocationsExcludingShuttingDownNodes() { .mapToLong(RoleAllocationStats::undesiredAllocationsExcludingShuttingDownNodes) .sum(); } + + /** + * Return the ratio of undesired allocations to the total number of allocations. + * + * @return a value in [0.0, 1.0] + */ + public double undesiredAllocationsRatio() { + long totalAllocations = totalAllocations(); + if (totalAllocations == 0) { + return 0; + } + return undesiredAllocationsExcludingShuttingDownNodes() / (double) totalAllocations; + } } /** @@ -86,7 +100,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String UNASSIGNED_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.unassigned.current"; /** See {@link #totalAllocations} */ public static final String TOTAL_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.current"; - /** See {@link #undesiredAllocationsExcludingShuttingDownNodes} */ + /** See {@link #undesiredAllocations} */ public static final String UNDESIRED_ALLOCATION_COUNT_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.current"; /** {@link #UNDESIRED_ALLOCATION_COUNT_METRIC_NAME} / {@link #TOTAL_SHARDS_METRIC_NAME} */ public static final String UNDESIRED_ALLOCATION_RATIO_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.ratio"; @@ -110,31 +124,14 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME = "es.allocator.allocations.node.forecasted_disk_usage_bytes.current"; - public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, Map.of()); + public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(0, Map.of()); private volatile boolean nodeIsMaster = false; /** - * Number of unassigned shards during last reconciliation - */ - private volatile long unassignedShards; - - /** - * Total number of assigned shards during last reconciliation - */ - private volatile long totalAllocations; - - /** - * Number of assigned shards during last reconciliation that are not allocated on a desired node and need to be moved. - * This excludes shards that must be reassigned due to a shutting down node. - */ - private volatile long undesiredAllocationsExcludingShuttingDownNodes; - - /** - * A breakdown of shards assigned and the undesired allocations from the last reconciliation - * broken down by {@link ShardRouting.Role}. + * The stats from the most recent reconciliation */ - private volatile Map allocationStatsByRole; + private volatile AllocationStats lastReconciliationAllocationStats = EMPTY_ALLOCATION_STATS; private final AtomicReference> weightStatsPerNodeRef = new AtomicReference<>(Map.of()); private final AtomicReference> allocationStatsPerNodeRef = new AtomicReference<>( @@ -149,10 +146,7 @@ public void updateMetrics( assert allocationStats != null : "allocation stats cannot be null"; assert weightStatsPerNode != null : "node balance weight stats cannot be null"; if (allocationStats != EMPTY_ALLOCATION_STATS) { - this.unassignedShards = allocationStats.unassignedShards; - this.totalAllocations = allocationStats.totalAllocations(); - this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes(); - this.allocationStatsByRole = allocationStats.allocationStatsByRole(); + this.lastReconciliationAllocationStats = allocationStats; } weightStatsPerNodeRef.set(weightStatsPerNode); allocationStatsPerNodeRef.set(nodeAllocationStats); @@ -251,23 +245,23 @@ public void setNodeIsMaster(boolean nodeIsMaster) { } public long unassignedShards() { - return unassignedShards; + return lastReconciliationAllocationStats.unassignedShards(); } public long totalAllocations() { - return totalAllocations; + return lastReconciliationAllocationStats.totalAllocations(); } public long undesiredAllocations() { - return undesiredAllocationsExcludingShuttingDownNodes; + return lastReconciliationAllocationStats.undesiredAllocationsExcludingShuttingDownNodes(); } public Map allocationStatsByRole() { - return allocationStatsByRole; + return lastReconciliationAllocationStats.allocationStatsByRole(); } private List getUnassignedShardsMetrics() { - return getIfPublishing(unassignedShards); + return getIfPublishing(AllocationStats::unassignedShards); } private List getDesiredBalanceNodeWeightMetrics() { @@ -396,25 +390,25 @@ private Map getNodeAttributes(DiscoveryNode node) { } private List getTotalAllocationsMetrics() { - return getIfPublishing(totalAllocations); + return getIfPublishing(AllocationStats::totalAllocations); } private List getUndesiredAllocationsExcludingShuttingDownNodesMetrics() { - return getIfPublishing(undesiredAllocationsExcludingShuttingDownNodes); + return getIfPublishing(AllocationStats::undesiredAllocationsExcludingShuttingDownNodes); } - private List getIfPublishing(long value) { - if (nodeIsMaster) { - return List.of(new LongWithAttributes(value)); + private List getIfPublishing(ToLongFunction value) { + var currentStats = lastReconciliationAllocationStats; + if (currentStats != EMPTY_ALLOCATION_STATS) { + return List.of(new LongWithAttributes(value.applyAsLong(currentStats))); } return List.of(); } private List getUndesiredAllocationsRatioMetrics() { - if (nodeIsMaster) { - var total = totalAllocations; - var undesired = undesiredAllocationsExcludingShuttingDownNodes; - return List.of(new DoubleWithAttributes(total != 0 ? (double) undesired / total : 0.0)); + var currentStats = lastReconciliationAllocationStats; + if (currentStats != EMPTY_ALLOCATION_STATS) { + return List.of(new DoubleWithAttributes(currentStats.undesiredAllocationsRatio())); } return List.of(); } @@ -424,9 +418,7 @@ private List getUndesiredAllocationsRatioMetrics() { * This is best-effort because it is possible for {@link #updateMetrics} to race with this method. */ public void zeroAllMetrics() { - unassignedShards = 0; - totalAllocations = 0; - undesiredAllocationsExcludingShuttingDownNodes = 0; + lastReconciliationAllocationStats = EMPTY_ALLOCATION_STATS; weightStatsPerNodeRef.set(Map.of()); allocationStatsPerNodeRef.set(Map.of()); } From 6f41c891aff124ed50f830467ee3f1bd33dc0ba9 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 12 Aug 2025 09:56:41 +1000 Subject: [PATCH 09/11] Only publish when node is master AND we have done a reconciliation --- .../routing/allocation/allocator/DesiredBalanceMetrics.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index 36b1269719800..1a6d0fd221534 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -399,7 +399,7 @@ private List getUndesiredAllocationsExcludingShuttingDownNod private List getIfPublishing(ToLongFunction value) { var currentStats = lastReconciliationAllocationStats; - if (currentStats != EMPTY_ALLOCATION_STATS) { + if (nodeIsMaster && currentStats != EMPTY_ALLOCATION_STATS) { return List.of(new LongWithAttributes(value.applyAsLong(currentStats))); } return List.of(); @@ -407,7 +407,7 @@ private List getIfPublishing(ToLongFunction private List getUndesiredAllocationsRatioMetrics() { var currentStats = lastReconciliationAllocationStats; - if (currentStats != EMPTY_ALLOCATION_STATS) { + if (nodeIsMaster && currentStats != EMPTY_ALLOCATION_STATS) { return List.of(new DoubleWithAttributes(currentStats.undesiredAllocationsRatio())); } return List.of(); From 1b0d35a64a3c91aaed242cdaea12b9e0733f6e13 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 14 Aug 2025 11:43:44 +1000 Subject: [PATCH 10/11] Use AllocationStats instead of map --- .../routing/allocation/allocator/DesiredBalanceMetrics.java | 4 ++-- .../allocation/allocator/DesiredBalanceShardsAllocator.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index 1a6d0fd221534..8e7de32acfa31 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -256,8 +256,8 @@ public long undesiredAllocations() { return lastReconciliationAllocationStats.undesiredAllocationsExcludingShuttingDownNodes(); } - public Map allocationStatsByRole() { - return lastReconciliationAllocationStats.allocationStatsByRole(); + public AllocationStats allocationStatsByRole() { + return lastReconciliationAllocationStats; } private List getUnassignedShardsMetrics() { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 70389efc41a74..65d7bf45139ea 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -438,7 +438,7 @@ public DesiredBalanceStats getStats() { ); } - public Map getAllocationStatsByRole() { + public DesiredBalanceMetrics.AllocationStats getAllocationStatsByRole() { return desiredBalanceMetrics.allocationStatsByRole(); } From e3e84bb8f9d37046a5019ff38f8d75b620d760ec Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 14 Aug 2025 12:12:46 +1000 Subject: [PATCH 11/11] Fix naming, some pedantry --- .../routing/allocation/allocator/DesiredBalanceMetrics.java | 4 ++-- .../allocation/allocator/DesiredBalanceShardsAllocator.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index 8e7de32acfa31..158b4941781f4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -61,7 +61,7 @@ public long undesiredAllocationsExcludingShuttingDownNodes() { * @return a value in [0.0, 1.0] */ public double undesiredAllocationsRatio() { - long totalAllocations = totalAllocations(); + final long totalAllocations = totalAllocations(); if (totalAllocations == 0) { return 0; } @@ -256,7 +256,7 @@ public long undesiredAllocations() { return lastReconciliationAllocationStats.undesiredAllocationsExcludingShuttingDownNodes(); } - public AllocationStats allocationStatsByRole() { + public AllocationStats allocationStats() { return lastReconciliationAllocationStats; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 65d7bf45139ea..e0b927a84519c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -438,8 +438,8 @@ public DesiredBalanceStats getStats() { ); } - public DesiredBalanceMetrics.AllocationStats getAllocationStatsByRole() { - return desiredBalanceMetrics.allocationStatsByRole(); + public DesiredBalanceMetrics.AllocationStats getAllocationStats() { + return desiredBalanceMetrics.allocationStats(); } private void onNoLongerMaster() {