Skip to content

Commit c9b45d8

Browse files
Breakdown undesired allocations by shard routing role (#132235)
In order that we can prevent scale-down in stateless when there are undesired allocations specifically in the indexing tier Closes: ES-12221 Co-authored-by: Pooya Salehi <[email protected]>
1 parent 175cce4 commit c9b45d8

File tree

3 files changed

+99
-37
lines changed

3 files changed

+99
-37
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java

Lines changed: 77 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

1212
import org.elasticsearch.cluster.node.DiscoveryNode;
13+
import org.elasticsearch.cluster.routing.ShardRouting;
1314
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight;
1415
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
1516
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
@@ -20,6 +21,7 @@
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.atomic.AtomicReference;
24+
import java.util.function.ToLongFunction;
2325

2426
/**
2527
* Maintains balancer metrics and makes them accessible to the {@link MeterRegistry} and APM reporting. Metrics are updated
@@ -31,12 +33,63 @@ public class DesiredBalanceMetrics {
3133

3234
/**
3335
* @param unassignedShards Shards that are not assigned to any node.
36+
* @param allocationStatsByRole A breakdown of the allocations stats by {@link ShardRouting.Role}
37+
*/
38+
public record AllocationStats(long unassignedShards, Map<ShardRouting.Role, RoleAllocationStats> allocationStatsByRole) {
39+
40+
public AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {
41+
this(
42+
unassignedShards,
43+
Map.of(ShardRouting.Role.DEFAULT, new RoleAllocationStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes))
44+
);
45+
}
46+
47+
public long totalAllocations() {
48+
return allocationStatsByRole.values().stream().mapToLong(RoleAllocationStats::totalAllocations).sum();
49+
}
50+
51+
public long undesiredAllocationsExcludingShuttingDownNodes() {
52+
return allocationStatsByRole.values()
53+
.stream()
54+
.mapToLong(RoleAllocationStats::undesiredAllocationsExcludingShuttingDownNodes)
55+
.sum();
56+
}
57+
58+
/**
59+
* Return the ratio of undesired allocations to the total number of allocations.
60+
*
61+
* @return a value in [0.0, 1.0]
62+
*/
63+
public double undesiredAllocationsRatio() {
64+
final long totalAllocations = totalAllocations();
65+
if (totalAllocations == 0) {
66+
return 0;
67+
}
68+
return undesiredAllocationsExcludingShuttingDownNodes() / (double) totalAllocations;
69+
}
70+
}
71+
72+
/**
3473
* @param totalAllocations Shards that are assigned to a node.
3574
* @param undesiredAllocationsExcludingShuttingDownNodes Shards that are assigned to a node but must move to alleviate a resource
3675
* constraint per the {@link AllocationDeciders}. Excludes shards that must move
3776
* because of a node shutting down.
3877
*/
39-
public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {}
78+
public record RoleAllocationStats(long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {
79+
public static final RoleAllocationStats EMPTY = new RoleAllocationStats(0L, 0L);
80+
81+
/**
82+
* Return the ratio of undesired allocations to the total number of allocations.
83+
*
84+
* @return a value in [0.0, 1.0]
85+
*/
86+
public double undesiredAllocationsRatio() {
87+
if (totalAllocations == 0) {
88+
return 0.0;
89+
}
90+
return undesiredAllocationsExcludingShuttingDownNodes / (double) totalAllocations;
91+
}
92+
}
4093

4194
public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {
4295
public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0);
@@ -47,7 +100,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
47100
public static final String UNASSIGNED_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.unassigned.current";
48101
/** See {@link #totalAllocations} */
49102
public static final String TOTAL_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.current";
50-
/** See {@link #undesiredAllocationsExcludingShuttingDownNodes} */
103+
/** See {@link #undesiredAllocations} */
51104
public static final String UNDESIRED_ALLOCATION_COUNT_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.current";
52105
/** {@link #UNDESIRED_ALLOCATION_COUNT_METRIC_NAME} / {@link #TOTAL_SHARDS_METRIC_NAME} */
53106
public static final String UNDESIRED_ALLOCATION_RATIO_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.ratio";
@@ -71,25 +124,14 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
71124
public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME =
72125
"es.allocator.allocations.node.forecasted_disk_usage_bytes.current";
73126

74-
public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1);
127+
public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(0, Map.of());
75128

76129
private volatile boolean nodeIsMaster = false;
77130

78131
/**
79-
* Number of unassigned shards during last reconciliation
80-
*/
81-
private volatile long unassignedShards;
82-
83-
/**
84-
* Total number of assigned shards during last reconciliation
132+
* The stats from the most recent reconciliation
85133
*/
86-
private volatile long totalAllocations;
87-
88-
/**
89-
* Number of assigned shards during last reconciliation that are not allocated on a desired node and need to be moved.
90-
* This excludes shards that must be reassigned due to a shutting down node.
91-
*/
92-
private volatile long undesiredAllocationsExcludingShuttingDownNodes;
134+
private volatile AllocationStats lastReconciliationAllocationStats = EMPTY_ALLOCATION_STATS;
93135

94136
private final AtomicReference<Map<DiscoveryNode, NodeWeightStats>> weightStatsPerNodeRef = new AtomicReference<>(Map.of());
95137
private final AtomicReference<Map<DiscoveryNode, NodeAllocationStatsAndWeight>> allocationStatsPerNodeRef = new AtomicReference<>(
@@ -104,9 +146,7 @@ public void updateMetrics(
104146
assert allocationStats != null : "allocation stats cannot be null";
105147
assert weightStatsPerNode != null : "node balance weight stats cannot be null";
106148
if (allocationStats != EMPTY_ALLOCATION_STATS) {
107-
this.unassignedShards = allocationStats.unassignedShards;
108-
this.totalAllocations = allocationStats.totalAllocations;
109-
this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes;
149+
this.lastReconciliationAllocationStats = allocationStats;
110150
}
111151
weightStatsPerNodeRef.set(weightStatsPerNode);
112152
allocationStatsPerNodeRef.set(nodeAllocationStats);
@@ -205,19 +245,23 @@ public void setNodeIsMaster(boolean nodeIsMaster) {
205245
}
206246

207247
public long unassignedShards() {
208-
return unassignedShards;
248+
return lastReconciliationAllocationStats.unassignedShards();
209249
}
210250

211251
public long totalAllocations() {
212-
return totalAllocations;
252+
return lastReconciliationAllocationStats.totalAllocations();
213253
}
214254

215255
public long undesiredAllocations() {
216-
return undesiredAllocationsExcludingShuttingDownNodes;
256+
return lastReconciliationAllocationStats.undesiredAllocationsExcludingShuttingDownNodes();
257+
}
258+
259+
public AllocationStats allocationStats() {
260+
return lastReconciliationAllocationStats;
217261
}
218262

219263
private List<LongWithAttributes> getUnassignedShardsMetrics() {
220-
return getIfPublishing(unassignedShards);
264+
return getIfPublishing(AllocationStats::unassignedShards);
221265
}
222266

223267
private List<DoubleWithAttributes> getDesiredBalanceNodeWeightMetrics() {
@@ -346,25 +390,25 @@ private Map<String, Object> getNodeAttributes(DiscoveryNode node) {
346390
}
347391

348392
private List<LongWithAttributes> getTotalAllocationsMetrics() {
349-
return getIfPublishing(totalAllocations);
393+
return getIfPublishing(AllocationStats::totalAllocations);
350394
}
351395

352396
private List<LongWithAttributes> getUndesiredAllocationsExcludingShuttingDownNodesMetrics() {
353-
return getIfPublishing(undesiredAllocationsExcludingShuttingDownNodes);
397+
return getIfPublishing(AllocationStats::undesiredAllocationsExcludingShuttingDownNodes);
354398
}
355399

356-
private List<LongWithAttributes> getIfPublishing(long value) {
357-
if (nodeIsMaster) {
358-
return List.of(new LongWithAttributes(value));
400+
private List<LongWithAttributes> getIfPublishing(ToLongFunction<AllocationStats> value) {
401+
var currentStats = lastReconciliationAllocationStats;
402+
if (nodeIsMaster && currentStats != EMPTY_ALLOCATION_STATS) {
403+
return List.of(new LongWithAttributes(value.applyAsLong(currentStats)));
359404
}
360405
return List.of();
361406
}
362407

363408
private List<DoubleWithAttributes> getUndesiredAllocationsRatioMetrics() {
364-
if (nodeIsMaster) {
365-
var total = totalAllocations;
366-
var undesired = undesiredAllocationsExcludingShuttingDownNodes;
367-
return List.of(new DoubleWithAttributes(total != 0 ? (double) undesired / total : 0.0));
409+
var currentStats = lastReconciliationAllocationStats;
410+
if (nodeIsMaster && currentStats != EMPTY_ALLOCATION_STATS) {
411+
return List.of(new DoubleWithAttributes(currentStats.undesiredAllocationsRatio()));
368412
}
369413
return List.of();
370414
}
@@ -374,9 +418,7 @@ private List<DoubleWithAttributes> getUndesiredAllocationsRatioMetrics() {
374418
* This is best-effort because it is possible for {@link #updateMetrics} to race with this method.
375419
*/
376420
public void zeroAllMetrics() {
377-
unassignedShards = 0;
378-
totalAllocations = 0;
379-
undesiredAllocationsExcludingShuttingDownNodes = 0;
421+
lastReconciliationAllocationStats = EMPTY_ALLOCATION_STATS;
380422
weightStatsPerNodeRef.set(Map.of());
381423
allocationStatsPerNodeRef.set(Map.of());
382424
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99

1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

12+
import com.carrotsearch.hppc.ObjectLongHashMap;
13+
import com.carrotsearch.hppc.ObjectLongMap;
14+
1215
import org.apache.logging.log4j.LogManager;
1316
import org.apache.logging.log4j.Logger;
1417
import org.apache.lucene.util.ArrayUtil;
@@ -40,6 +43,7 @@
4043
import java.util.function.BiFunction;
4144
import java.util.stream.Collectors;
4245
import java.util.stream.IntStream;
46+
import java.util.stream.StreamSupport;
4347

4448
import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE;
4549
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize;
@@ -525,6 +529,8 @@ private DesiredBalanceMetrics.AllocationStats balance() {
525529
int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size();
526530
int totalAllocations = 0;
527531
int undesiredAllocationsExcludingShuttingDownNodes = 0;
532+
final ObjectLongMap<ShardRouting.Role> totalAllocationsByRole = new ObjectLongHashMap<>();
533+
final ObjectLongMap<ShardRouting.Role> undesiredAllocationsExcludingShuttingDownNodesByRole = new ObjectLongHashMap<>();
528534

529535
// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard
530536
// movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the
@@ -533,6 +539,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
533539
final var shardRouting = iterator.next();
534540

535541
totalAllocations++;
542+
totalAllocationsByRole.addTo(shardRouting.role(), 1);
536543

537544
if (shardRouting.started() == false) {
538545
// can only rebalance started shards
@@ -553,6 +560,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
553560
if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) {
554561
// shard is not on a shutting down node, nor is it on a desired node per the previous check.
555562
undesiredAllocationsExcludingShuttingDownNodes++;
563+
undesiredAllocationsExcludingShuttingDownNodesByRole.addTo(shardRouting.role(), 1);
556564
}
557565

558566
if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) {
@@ -594,8 +602,16 @@ private DesiredBalanceMetrics.AllocationStats balance() {
594602
maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size());
595603
return new DesiredBalanceMetrics.AllocationStats(
596604
unassignedShards,
597-
totalAllocations,
598-
undesiredAllocationsExcludingShuttingDownNodes
605+
StreamSupport.stream(totalAllocationsByRole.spliterator(), false)
606+
.collect(
607+
Collectors.toUnmodifiableMap(
608+
lc -> lc.key,
609+
lc -> new DesiredBalanceMetrics.RoleAllocationStats(
610+
totalAllocationsByRole.get(lc.key),
611+
undesiredAllocationsExcludingShuttingDownNodesByRole.get(lc.key)
612+
)
613+
)
614+
)
599615
);
600616
}
601617

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,10 @@ public DesiredBalanceStats getStats() {
438438
);
439439
}
440440

441+
public DesiredBalanceMetrics.AllocationStats getAllocationStats() {
442+
return desiredBalanceMetrics.allocationStats();
443+
}
444+
441445
private void onNoLongerMaster() {
442446
if (indexGenerator.getAndSet(-1) != -1) {
443447
currentDesiredBalanceRef.set(DesiredBalance.NOT_MASTER);

0 commit comments

Comments
 (0)