diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index 5771c27c5d5ab..158b4941781f4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -10,6 +10,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; @@ -20,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.ToLongFunction; /** * Maintains balancer metrics and makes them accessible to the {@link MeterRegistry} and APM reporting. Metrics are updated @@ -31,12 +33,63 @@ public class DesiredBalanceMetrics { /** * @param unassignedShards Shards that are not assigned to any node. + * @param allocationStatsByRole A breakdown of the allocations stats by {@link ShardRouting.Role} + */ + public record AllocationStats(long unassignedShards, Map allocationStatsByRole) { + + public AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) { + this( + unassignedShards, + Map.of(ShardRouting.Role.DEFAULT, new RoleAllocationStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes)) + ); + } + + public long totalAllocations() { + return allocationStatsByRole.values().stream().mapToLong(RoleAllocationStats::totalAllocations).sum(); + } + + public long undesiredAllocationsExcludingShuttingDownNodes() { + return allocationStatsByRole.values() + .stream() + .mapToLong(RoleAllocationStats::undesiredAllocationsExcludingShuttingDownNodes) + .sum(); + } + + /** + * Return the ratio of undesired allocations to the total number of allocations. + * + * @return a value in [0.0, 1.0] + */ + public double undesiredAllocationsRatio() { + final long totalAllocations = totalAllocations(); + if (totalAllocations == 0) { + return 0; + } + return undesiredAllocationsExcludingShuttingDownNodes() / (double) totalAllocations; + } + } + + /** * @param totalAllocations Shards that are assigned to a node. * @param undesiredAllocationsExcludingShuttingDownNodes Shards that are assigned to a node but must move to alleviate a resource * constraint per the {@link AllocationDeciders}. Excludes shards that must move * because of a node shutting down. */ - public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {} + public record RoleAllocationStats(long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) { + public static final RoleAllocationStats EMPTY = new RoleAllocationStats(0L, 0L); + + /** + * Return the ratio of undesired allocations to the total number of allocations. + * + * @return a value in [0.0, 1.0] + */ + public double undesiredAllocationsRatio() { + if (totalAllocations == 0) { + return 0.0; + } + return undesiredAllocationsExcludingShuttingDownNodes / (double) totalAllocations; + } + } public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) { public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0); @@ -47,7 +100,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String UNASSIGNED_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.unassigned.current"; /** See {@link #totalAllocations} */ public static final String TOTAL_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.current"; - /** See {@link #undesiredAllocationsExcludingShuttingDownNodes} */ + /** See {@link #undesiredAllocations} */ public static final String UNDESIRED_ALLOCATION_COUNT_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.current"; /** {@link #UNDESIRED_ALLOCATION_COUNT_METRIC_NAME} / {@link #TOTAL_SHARDS_METRIC_NAME} */ public static final String UNDESIRED_ALLOCATION_RATIO_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.ratio"; @@ -71,25 +124,14 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME = "es.allocator.allocations.node.forecasted_disk_usage_bytes.current"; - public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1); + public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(0, Map.of()); private volatile boolean nodeIsMaster = false; /** - * Number of unassigned shards during last reconciliation - */ - private volatile long unassignedShards; - - /** - * Total number of assigned shards during last reconciliation + * The stats from the most recent reconciliation */ - private volatile long totalAllocations; - - /** - * Number of assigned shards during last reconciliation that are not allocated on a desired node and need to be moved. - * This excludes shards that must be reassigned due to a shutting down node. - */ - private volatile long undesiredAllocationsExcludingShuttingDownNodes; + private volatile AllocationStats lastReconciliationAllocationStats = EMPTY_ALLOCATION_STATS; private final AtomicReference> weightStatsPerNodeRef = new AtomicReference<>(Map.of()); private final AtomicReference> allocationStatsPerNodeRef = new AtomicReference<>( @@ -104,9 +146,7 @@ public void updateMetrics( assert allocationStats != null : "allocation stats cannot be null"; assert weightStatsPerNode != null : "node balance weight stats cannot be null"; if (allocationStats != EMPTY_ALLOCATION_STATS) { - this.unassignedShards = allocationStats.unassignedShards; - this.totalAllocations = allocationStats.totalAllocations; - this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes; + this.lastReconciliationAllocationStats = allocationStats; } weightStatsPerNodeRef.set(weightStatsPerNode); allocationStatsPerNodeRef.set(nodeAllocationStats); @@ -205,19 +245,23 @@ public void setNodeIsMaster(boolean nodeIsMaster) { } public long unassignedShards() { - return unassignedShards; + return lastReconciliationAllocationStats.unassignedShards(); } public long totalAllocations() { - return totalAllocations; + return lastReconciliationAllocationStats.totalAllocations(); } public long undesiredAllocations() { - return undesiredAllocationsExcludingShuttingDownNodes; + return lastReconciliationAllocationStats.undesiredAllocationsExcludingShuttingDownNodes(); + } + + public AllocationStats allocationStats() { + return lastReconciliationAllocationStats; } private List getUnassignedShardsMetrics() { - return getIfPublishing(unassignedShards); + return getIfPublishing(AllocationStats::unassignedShards); } private List getDesiredBalanceNodeWeightMetrics() { @@ -346,25 +390,25 @@ private Map getNodeAttributes(DiscoveryNode node) { } private List getTotalAllocationsMetrics() { - return getIfPublishing(totalAllocations); + return getIfPublishing(AllocationStats::totalAllocations); } private List getUndesiredAllocationsExcludingShuttingDownNodesMetrics() { - return getIfPublishing(undesiredAllocationsExcludingShuttingDownNodes); + return getIfPublishing(AllocationStats::undesiredAllocationsExcludingShuttingDownNodes); } - private List getIfPublishing(long value) { - if (nodeIsMaster) { - return List.of(new LongWithAttributes(value)); + private List getIfPublishing(ToLongFunction value) { + var currentStats = lastReconciliationAllocationStats; + if (nodeIsMaster && currentStats != EMPTY_ALLOCATION_STATS) { + return List.of(new LongWithAttributes(value.applyAsLong(currentStats))); } return List.of(); } private List getUndesiredAllocationsRatioMetrics() { - if (nodeIsMaster) { - var total = totalAllocations; - var undesired = undesiredAllocationsExcludingShuttingDownNodes; - return List.of(new DoubleWithAttributes(total != 0 ? (double) undesired / total : 0.0)); + var currentStats = lastReconciliationAllocationStats; + if (nodeIsMaster && currentStats != EMPTY_ALLOCATION_STATS) { + return List.of(new DoubleWithAttributes(currentStats.undesiredAllocationsRatio())); } return List.of(); } @@ -374,9 +418,7 @@ private List getUndesiredAllocationsRatioMetrics() { * This is best-effort because it is possible for {@link #updateMetrics} to race with this method. */ public void zeroAllMetrics() { - unassignedShards = 0; - totalAllocations = 0; - undesiredAllocationsExcludingShuttingDownNodes = 0; + lastReconciliationAllocationStats = EMPTY_ALLOCATION_STATS; weightStatsPerNodeRef.set(Map.of()); allocationStatsPerNodeRef.set(Map.of()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 8e69d72777f04..c55e2a23ab8fa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -9,6 +9,9 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import com.carrotsearch.hppc.ObjectLongHashMap; +import com.carrotsearch.hppc.ObjectLongMap; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.ArrayUtil; @@ -40,6 +43,7 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE; import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize; @@ -525,6 +529,8 @@ private DesiredBalanceMetrics.AllocationStats balance() { int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size(); int totalAllocations = 0; int undesiredAllocationsExcludingShuttingDownNodes = 0; + final ObjectLongMap totalAllocationsByRole = new ObjectLongHashMap<>(); + final ObjectLongMap undesiredAllocationsExcludingShuttingDownNodesByRole = new ObjectLongHashMap<>(); // Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard // movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the @@ -533,6 +539,7 @@ private DesiredBalanceMetrics.AllocationStats balance() { final var shardRouting = iterator.next(); totalAllocations++; + totalAllocationsByRole.addTo(shardRouting.role(), 1); if (shardRouting.started() == false) { // can only rebalance started shards @@ -553,6 +560,7 @@ private DesiredBalanceMetrics.AllocationStats balance() { if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) { // shard is not on a shutting down node, nor is it on a desired node per the previous check. undesiredAllocationsExcludingShuttingDownNodes++; + undesiredAllocationsExcludingShuttingDownNodesByRole.addTo(shardRouting.role(), 1); } if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { @@ -594,8 +602,16 @@ private DesiredBalanceMetrics.AllocationStats balance() { maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size()); return new DesiredBalanceMetrics.AllocationStats( unassignedShards, - totalAllocations, - undesiredAllocationsExcludingShuttingDownNodes + StreamSupport.stream(totalAllocationsByRole.spliterator(), false) + .collect( + Collectors.toUnmodifiableMap( + lc -> lc.key, + lc -> new DesiredBalanceMetrics.RoleAllocationStats( + totalAllocationsByRole.get(lc.key), + undesiredAllocationsExcludingShuttingDownNodesByRole.get(lc.key) + ) + ) + ) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 515da761d8696..e0b927a84519c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -438,6 +438,10 @@ public DesiredBalanceStats getStats() { ); } + public DesiredBalanceMetrics.AllocationStats getAllocationStats() { + return desiredBalanceMetrics.allocationStats(); + } + private void onNoLongerMaster() { if (indexGenerator.getAndSet(-1) != -1) { currentDesiredBalanceRef.set(DesiredBalance.NOT_MASTER);