From a1899a32fa64d3419e46f7e1b8e4d802d94fd3a4 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 18 Jul 2025 14:00:02 -0700 Subject: [PATCH 1/4] Create a new monitor for node-level write load Plugs a basic NodeUsageStatsForThreadPoolsMonitor into the code and sets it up with access to information it will need to implement ES-11992. Relates ES-11991 --- .../NodeUsageStatsForThreadPoolsMonitor.java | 89 +++++++++++++++++++ .../WriteLoadConstraintSettings.java | 25 ++++++ .../elasticsearch/node/NodeConstruction.java | 10 +++ ...rnalClusterInfoServiceSchedulingTests.java | 17 ++++ 4 files changed, 141 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java new file mode 100644 index 0000000000000..8dea6caf7b316 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.gateway.GatewayService; + +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +/** + * Monitors the node-level thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via + * {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds. + * + * TODO (ES-11992): implement + */ +public class NodeUsageStatsForThreadPoolsMonitor { + private static final Logger logger = LogManager.getLogger(NodeUsageStatsForThreadPoolsMonitor.class); + private final WriteLoadConstraintSettings writeLoadConstraintSettings; + private final Supplier clusterStateSupplier; + private final LongSupplier currentTimeMillisSupplier; + private final RerouteService rerouteService; + + public NodeUsageStatsForThreadPoolsMonitor( + ClusterSettings clusterSettings, + LongSupplier currentTimeMillisSupplier, + Supplier clusterStateSupplier, + RerouteService rerouteService + ) { + this.writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings); + this.clusterStateSupplier = clusterStateSupplier; + this.currentTimeMillisSupplier = currentTimeMillisSupplier; + this.rerouteService = rerouteService; + } + + /** + * Receives a copy of the latest {@link ClusterInfo} whenever the {@link ClusterInfoService} collects it. Processes the new + * {@link org.elasticsearch.cluster.NodeUsageStatsForThreadPools} and initiates rebalancing, via reroute, if a node in the cluster + * exceeds thread pool usage thresholds. + */ + public void onNewInfo(ClusterInfo clusterInfo) { + final ClusterState state = clusterStateSupplier.get(); + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + logger.debug("skipping monitor as the cluster state is not recovered yet"); + return; + } + + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED) { + logger.trace("skipping monitor because the write load decider is disabled"); + return; + } + + logger.trace("processing new cluster info"); + + boolean reroute = false; + String explanation = ""; + final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + + // TODO (ES-11992): implement + + if (reroute) { + logger.debug("rerouting shards: [{}]", explanation); + rerouteService.reroute("disk threshold monitor", Priority.NORMAL, ActionListener.wrap(ignored -> { + final var reroutedClusterState = clusterStateSupplier.get(); + + // TODO (ES-11992): implement + + }, e -> logger.debug("reroute failed", e))); + } else { + logger.trace("no reroute required"); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java index cba02ed207b81..23e1cb563f9fd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.RatioValue; import org.elasticsearch.core.TimeValue; @@ -98,4 +99,28 @@ public enum WriteLoadDeciderStatus { Setting.Property.Dynamic, Setting.Property.NodeScope ); + + WriteLoadDeciderStatus writeLoadDeciderStatus; + TimeValue writeLoadDeciderRerouteIntervalSetting; + + WriteLoadConstraintSettings(ClusterSettings clusterSettings) { + clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled); + clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting); + }; + + private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) { + this.writeLoadDeciderStatus = status; + } + + public WriteLoadDeciderStatus getWriteLoadConstraintEnabled() { + return this.writeLoadDeciderStatus; + } + + public TimeValue getWriteLoadDeciderRerouteIntervalSetting() { + return this.writeLoadDeciderRerouteIntervalSetting; + } + + private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) { + this.writeLoadDeciderRerouteIntervalSetting = timeValue; + } } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index bb28ed4a8aff5..5d30a30e22c5d 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -60,6 +60,7 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; +import org.elasticsearch.cluster.routing.allocation.NodeUsageStatsForThreadPoolsMonitor; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.version.CompatibilityVersions; @@ -785,6 +786,15 @@ private void construct( )::onNewInfo ); + clusterInfoService.addListener( + new NodeUsageStatsForThreadPoolsMonitor( + clusterService.getClusterSettings(), + threadPool.relativeTimeInMillisSupplier(), + clusterService::state, + rerouteService + )::onNewInfo + ); + IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class).toList()); modules.add(indicesModule); diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 6e80e0d087993..45c94cf464ecd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -21,11 +21,14 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.routing.allocation.NodeUsageStatsForThreadPoolsMonitor; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; @@ -95,6 +98,18 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { mockEstimatedHeapUsageCollector, mockNodeUsageStatsForThreadPoolsCollector ); + final NodeUsageStatsForThreadPoolsMonitor usageMonitor = spy( + new NodeUsageStatsForThreadPoolsMonitor( + clusterService.getClusterSettings(), + threadPool.relativeTimeInMillisSupplier(), + clusterService::state, + new RerouteService() { + @Override + public void reroute(String reason, Priority priority, ActionListener listener) {} + } + ) + ); + clusterInfoService.addListener(usageMonitor::onNewInfo); clusterService.addListener(clusterInfoService); clusterInfoService.addListener(ignored -> {}); @@ -132,6 +147,7 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { for (int i = 0; i < 3; i++) { Mockito.clearInvocations(mockEstimatedHeapUsageCollector); Mockito.clearInvocations(mockNodeUsageStatsForThreadPoolsCollector); + Mockito.clearInvocations(usageMonitor); final int initialRequestCount = client.requestCount; final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); runFor(deterministicTaskQueue, duration); @@ -139,6 +155,7 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval verify(mockNodeUsageStatsForThreadPoolsCollector).collectUsageStats(any()); + verify(usageMonitor).onNewInfo(any()); } final AtomicBoolean failMaster2 = new AtomicBoolean(); From 9bcf8abe5f7ab9db231fd8d10c4510c9c2977114 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 18 Jul 2025 21:12:30 +0000 Subject: [PATCH 2/4] [CI] Auto commit changes from spotless --- .../routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java index 8dea6caf7b316..c3911edda0da8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.RerouteService; -import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.gateway.GatewayService; From 14454c14bf6b497a5cddbae60af8991490f3465e Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 21 Jul 2025 12:01:09 -0700 Subject: [PATCH 3/4] rename class --- ...eadPoolsMonitor.java => WriteLoadConstraintMonitor.java} | 6 +++--- .../main/java/org/elasticsearch/node/NodeConstruction.java | 4 ++-- .../cluster/InternalClusterInfoServiceSchedulingTests.java | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) rename server/src/main/java/org/elasticsearch/cluster/routing/allocation/{NodeUsageStatsForThreadPoolsMonitor.java => WriteLoadConstraintMonitor.java} (94%) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java similarity index 94% rename from server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java rename to server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java index c3911edda0da8..081d082c647a5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeUsageStatsForThreadPoolsMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java @@ -29,14 +29,14 @@ * * TODO (ES-11992): implement */ -public class NodeUsageStatsForThreadPoolsMonitor { - private static final Logger logger = LogManager.getLogger(NodeUsageStatsForThreadPoolsMonitor.class); +public class WriteLoadConstraintMonitor { + private static final Logger logger = LogManager.getLogger(WriteLoadConstraintMonitor.class); private final WriteLoadConstraintSettings writeLoadConstraintSettings; private final Supplier clusterStateSupplier; private final LongSupplier currentTimeMillisSupplier; private final RerouteService rerouteService; - public NodeUsageStatsForThreadPoolsMonitor( + public WriteLoadConstraintMonitor( ClusterSettings clusterSettings, LongSupplier currentTimeMillisSupplier, Supplier clusterStateSupplier, diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 5d30a30e22c5d..5f548f9701956 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -60,7 +60,7 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; -import org.elasticsearch.cluster.routing.allocation.NodeUsageStatsForThreadPoolsMonitor; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.version.CompatibilityVersions; @@ -787,7 +787,7 @@ private void construct( ); clusterInfoService.addListener( - new NodeUsageStatsForThreadPoolsMonitor( + new WriteLoadConstraintMonitor( clusterService.getClusterSettings(), threadPool.relativeTimeInMillisSupplier(), clusterService::state, diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 45c94cf464ecd..d5dbe23b7994d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -22,7 +22,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RerouteService; -import org.elasticsearch.cluster.routing.allocation.NodeUsageStatsForThreadPoolsMonitor; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; @@ -98,8 +98,8 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { mockEstimatedHeapUsageCollector, mockNodeUsageStatsForThreadPoolsCollector ); - final NodeUsageStatsForThreadPoolsMonitor usageMonitor = spy( - new NodeUsageStatsForThreadPoolsMonitor( + final WriteLoadConstraintMonitor usageMonitor = spy( + new WriteLoadConstraintMonitor( clusterService.getClusterSettings(), threadPool.relativeTimeInMillisSupplier(), clusterService::state, From 8bd66fe9e15c81bcaea89b82505eb455104c3df2 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 21 Jul 2025 12:01:47 -0700 Subject: [PATCH 4/4] comment touch up --- .../cluster/routing/allocation/WriteLoadConstraintMonitor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java index 081d082c647a5..4826aeda574de 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java @@ -24,7 +24,7 @@ import java.util.function.Supplier; /** - * Monitors the node-level thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via + * Monitors the node-level write thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via * {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds. * * TODO (ES-11992): implement