Skip to content

Breakdown undesired allocations by shard routing role #132235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
Expand All @@ -20,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToLongFunction;

/**
* Maintains balancer metrics and makes them accessible to the {@link MeterRegistry} and APM reporting. Metrics are updated
Expand All @@ -31,12 +33,63 @@ public class DesiredBalanceMetrics {

/**
* @param unassignedShards Shards that are not assigned to any node.
* @param allocationStatsByRole A breakdown of the allocations stats by {@link ShardRouting.Role}
*/
public record AllocationStats(long unassignedShards, Map<ShardRouting.Role, RoleAllocationStats> allocationStatsByRole) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you're going with replacing totals with the role-based map, doesn't the current version implicitly assume default and search_only/index_only are exclusive? Shouldn't we assert that either the counts for default are empty or for both search_only/index_only, so that when this assumption is not valid totalAllocations() and undesiredAllocationsExcludingShuttingDownNodes() wouldn't silently return something wrong? That's what I was saying in my previous comment.


public AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {
this(
unassignedShards,
Map.of(ShardRouting.Role.DEFAULT, new RoleAllocationStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes))
);
}

public long totalAllocations() {
return allocationStatsByRole.values().stream().mapToLong(RoleAllocationStats::totalAllocations).sum();
}

public long undesiredAllocationsExcludingShuttingDownNodes() {
return allocationStatsByRole.values()
.stream()
.mapToLong(RoleAllocationStats::undesiredAllocationsExcludingShuttingDownNodes)
.sum();
}

/**
* Return the ratio of undesired allocations to the total number of allocations.
*
* @return a value in [0.0, 1.0]
*/
public double undesiredAllocationsRatio() {
long totalAllocations = totalAllocations();
if (totalAllocations == 0) {
return 0;
}
return undesiredAllocationsExcludingShuttingDownNodes() / (double) totalAllocations;
}
}

/**
* @param totalAllocations Shards that are assigned to a node.
* @param undesiredAllocationsExcludingShuttingDownNodes Shards that are assigned to a node but must move to alleviate a resource
* constraint per the {@link AllocationDeciders}. Excludes shards that must move
* because of a node shutting down.
*/
public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {}
public record RoleAllocationStats(long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {
public static final RoleAllocationStats EMPTY = new RoleAllocationStats(0L, 0L);

/**
* Return the ratio of undesired allocations to the total number of allocations.
*
* @return a value in [0.0, 1.0]
*/
public double undesiredAllocationsRatio() {
if (totalAllocations == 0) {
return 0.0;
}
return undesiredAllocationsExcludingShuttingDownNodes / (double) totalAllocations;
}
}

public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {
public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0);
Expand All @@ -47,7 +100,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
public static final String UNASSIGNED_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.unassigned.current";
/** See {@link #totalAllocations} */
public static final String TOTAL_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.current";
/** See {@link #undesiredAllocationsExcludingShuttingDownNodes} */
/** See {@link #undesiredAllocations} */
public static final String UNDESIRED_ALLOCATION_COUNT_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.current";
/** {@link #UNDESIRED_ALLOCATION_COUNT_METRIC_NAME} / {@link #TOTAL_SHARDS_METRIC_NAME} */
public static final String UNDESIRED_ALLOCATION_RATIO_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.ratio";
Expand All @@ -71,25 +124,14 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME =
"es.allocator.allocations.node.forecasted_disk_usage_bytes.current";

public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1);
public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(0, Map.of());

private volatile boolean nodeIsMaster = false;

/**
* Number of unassigned shards during last reconciliation
*/
private volatile long unassignedShards;

/**
* Total number of assigned shards during last reconciliation
* The stats from the most recent reconciliation
*/
private volatile long totalAllocations;

/**
* Number of assigned shards during last reconciliation that are not allocated on a desired node and need to be moved.
* This excludes shards that must be reassigned due to a shutting down node.
*/
private volatile long undesiredAllocationsExcludingShuttingDownNodes;
private volatile AllocationStats lastReconciliationAllocationStats = EMPTY_ALLOCATION_STATS;

private final AtomicReference<Map<DiscoveryNode, NodeWeightStats>> weightStatsPerNodeRef = new AtomicReference<>(Map.of());
private final AtomicReference<Map<DiscoveryNode, NodeAllocationStatsAndWeight>> allocationStatsPerNodeRef = new AtomicReference<>(
Expand All @@ -104,9 +146,7 @@ public void updateMetrics(
assert allocationStats != null : "allocation stats cannot be null";
assert weightStatsPerNode != null : "node balance weight stats cannot be null";
if (allocationStats != EMPTY_ALLOCATION_STATS) {
this.unassignedShards = allocationStats.unassignedShards;
this.totalAllocations = allocationStats.totalAllocations;
this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes;
this.lastReconciliationAllocationStats = allocationStats;
}
weightStatsPerNodeRef.set(weightStatsPerNode);
allocationStatsPerNodeRef.set(nodeAllocationStats);
Expand Down Expand Up @@ -205,19 +245,23 @@ public void setNodeIsMaster(boolean nodeIsMaster) {
}

public long unassignedShards() {
return unassignedShards;
return lastReconciliationAllocationStats.unassignedShards();
}

public long totalAllocations() {
return totalAllocations;
return lastReconciliationAllocationStats.totalAllocations();
}

public long undesiredAllocations() {
return undesiredAllocationsExcludingShuttingDownNodes;
return lastReconciliationAllocationStats.undesiredAllocationsExcludingShuttingDownNodes();
}

public Map<ShardRouting.Role, RoleAllocationStats> allocationStatsByRole() {
return lastReconciliationAllocationStats.allocationStatsByRole();
}

private List<LongWithAttributes> getUnassignedShardsMetrics() {
return getIfPublishing(unassignedShards);
return getIfPublishing(AllocationStats::unassignedShards);
}

private List<DoubleWithAttributes> getDesiredBalanceNodeWeightMetrics() {
Expand Down Expand Up @@ -346,25 +390,25 @@ private Map<String, Object> getNodeAttributes(DiscoveryNode node) {
}

private List<LongWithAttributes> getTotalAllocationsMetrics() {
return getIfPublishing(totalAllocations);
return getIfPublishing(AllocationStats::totalAllocations);
}

private List<LongWithAttributes> getUndesiredAllocationsExcludingShuttingDownNodesMetrics() {
return getIfPublishing(undesiredAllocationsExcludingShuttingDownNodes);
return getIfPublishing(AllocationStats::undesiredAllocationsExcludingShuttingDownNodes);
}

private List<LongWithAttributes> getIfPublishing(long value) {
if (nodeIsMaster) {
return List.of(new LongWithAttributes(value));
private List<LongWithAttributes> getIfPublishing(ToLongFunction<AllocationStats> value) {
var currentStats = lastReconciliationAllocationStats;
if (nodeIsMaster && currentStats != EMPTY_ALLOCATION_STATS) {
return List.of(new LongWithAttributes(value.applyAsLong(currentStats)));
}
return List.of();
}

private List<DoubleWithAttributes> getUndesiredAllocationsRatioMetrics() {
if (nodeIsMaster) {
var total = totalAllocations;
var undesired = undesiredAllocationsExcludingShuttingDownNodes;
return List.of(new DoubleWithAttributes(total != 0 ? (double) undesired / total : 0.0));
var currentStats = lastReconciliationAllocationStats;
if (nodeIsMaster && currentStats != EMPTY_ALLOCATION_STATS) {
return List.of(new DoubleWithAttributes(currentStats.undesiredAllocationsRatio()));
}
return List.of();
}
Expand All @@ -374,9 +418,7 @@ private List<DoubleWithAttributes> getUndesiredAllocationsRatioMetrics() {
* This is best-effort because it is possible for {@link #updateMetrics} to race with this method.
*/
public void zeroAllMetrics() {
unassignedShards = 0;
totalAllocations = 0;
undesiredAllocationsExcludingShuttingDownNodes = 0;
lastReconciliationAllocationStats = EMPTY_ALLOCATION_STATS;
weightStatsPerNodeRef.set(Map.of());
allocationStatsPerNodeRef.set(Map.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.ArrayUtil;
Expand Down Expand Up @@ -40,6 +43,7 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE;
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize;
Expand Down Expand Up @@ -525,6 +529,8 @@ private DesiredBalanceMetrics.AllocationStats balance() {
int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size();
int totalAllocations = 0;
int undesiredAllocationsExcludingShuttingDownNodes = 0;
final ObjectLongMap<ShardRouting.Role> totalAllocationsByRole = new ObjectLongHashMap<>();
final ObjectLongMap<ShardRouting.Role> undesiredAllocationsExcludingShuttingDownNodesByRole = new ObjectLongHashMap<>();

// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard
// movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the
Expand All @@ -533,6 +539,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
final var shardRouting = iterator.next();

totalAllocations++;
totalAllocationsByRole.addTo(shardRouting.role(), 1);

if (shardRouting.started() == false) {
// can only rebalance started shards
Expand All @@ -553,6 +560,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) {
// shard is not on a shutting down node, nor is it on a desired node per the previous check.
undesiredAllocationsExcludingShuttingDownNodes++;
undesiredAllocationsExcludingShuttingDownNodesByRole.addTo(shardRouting.role(), 1);
}

if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) {
Expand Down Expand Up @@ -594,8 +602,16 @@ private DesiredBalanceMetrics.AllocationStats balance() {
maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size());
return new DesiredBalanceMetrics.AllocationStats(
unassignedShards,
totalAllocations,
undesiredAllocationsExcludingShuttingDownNodes
StreamSupport.stream(totalAllocationsByRole.spliterator(), false)
.collect(
Collectors.toUnmodifiableMap(
lc -> lc.key,
lc -> new DesiredBalanceMetrics.RoleAllocationStats(
totalAllocationsByRole.get(lc.key),
undesiredAllocationsExcludingShuttingDownNodesByRole.get(lc.key)
)
)
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ public DesiredBalanceStats getStats() {
);
}

public Map<ShardRouting.Role, DesiredBalanceMetrics.RoleAllocationStats> getAllocationStatsByRole() {
return desiredBalanceMetrics.allocationStatsByRole();
}

private void onNoLongerMaster() {
if (indexGenerator.getAndSet(-1) != -1) {
currentDesiredBalanceRef.set(DesiredBalance.NOT_MASTER);
Expand Down