Skip to content

Commit 79bdc32

Browse files
Create a new monitor for node-level write load (#131560)
Plugs a basic WriteLoadConstraintMonitor into the code and sets it up with access to information it will need to implement ES-11992. Closes ES-11991
1 parent 5a38160 commit 79bdc32

File tree

4 files changed

+140
-0
lines changed

4 files changed

+140
-0
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.cluster.ClusterInfo;
16+
import org.elasticsearch.cluster.ClusterInfoService;
17+
import org.elasticsearch.cluster.ClusterState;
18+
import org.elasticsearch.cluster.routing.RerouteService;
19+
import org.elasticsearch.common.Priority;
20+
import org.elasticsearch.common.settings.ClusterSettings;
21+
import org.elasticsearch.gateway.GatewayService;
22+
23+
import java.util.function.LongSupplier;
24+
import java.util.function.Supplier;
25+
26+
/**
27+
* Monitors the node-level write thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via
28+
* {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds.
29+
*
30+
* TODO (ES-11992): implement
31+
*/
32+
public class WriteLoadConstraintMonitor {
33+
private static final Logger logger = LogManager.getLogger(WriteLoadConstraintMonitor.class);
34+
private final WriteLoadConstraintSettings writeLoadConstraintSettings;
35+
private final Supplier<ClusterState> clusterStateSupplier;
36+
private final LongSupplier currentTimeMillisSupplier;
37+
private final RerouteService rerouteService;
38+
39+
public WriteLoadConstraintMonitor(
40+
ClusterSettings clusterSettings,
41+
LongSupplier currentTimeMillisSupplier,
42+
Supplier<ClusterState> clusterStateSupplier,
43+
RerouteService rerouteService
44+
) {
45+
this.writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings);
46+
this.clusterStateSupplier = clusterStateSupplier;
47+
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
48+
this.rerouteService = rerouteService;
49+
}
50+
51+
/**
52+
* Receives a copy of the latest {@link ClusterInfo} whenever the {@link ClusterInfoService} collects it. Processes the new
53+
* {@link org.elasticsearch.cluster.NodeUsageStatsForThreadPools} and initiates rebalancing, via reroute, if a node in the cluster
54+
* exceeds thread pool usage thresholds.
55+
*/
56+
public void onNewInfo(ClusterInfo clusterInfo) {
57+
final ClusterState state = clusterStateSupplier.get();
58+
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
59+
logger.debug("skipping monitor as the cluster state is not recovered yet");
60+
return;
61+
}
62+
63+
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED) {
64+
logger.trace("skipping monitor because the write load decider is disabled");
65+
return;
66+
}
67+
68+
logger.trace("processing new cluster info");
69+
70+
boolean reroute = false;
71+
String explanation = "";
72+
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
73+
74+
// TODO (ES-11992): implement
75+
76+
if (reroute) {
77+
logger.debug("rerouting shards: [{}]", explanation);
78+
rerouteService.reroute("disk threshold monitor", Priority.NORMAL, ActionListener.wrap(ignored -> {
79+
final var reroutedClusterState = clusterStateSupplier.get();
80+
81+
// TODO (ES-11992): implement
82+
83+
}, e -> logger.debug("reroute failed", e)));
84+
} else {
85+
logger.trace("no reroute required");
86+
}
87+
}
88+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.cluster.routing.RerouteService;
1414
import org.elasticsearch.common.Priority;
15+
import org.elasticsearch.common.settings.ClusterSettings;
1516
import org.elasticsearch.common.settings.Setting;
1617
import org.elasticsearch.common.unit.RatioValue;
1718
import org.elasticsearch.core.TimeValue;
@@ -98,4 +99,28 @@ public enum WriteLoadDeciderStatus {
9899
Setting.Property.Dynamic,
99100
Setting.Property.NodeScope
100101
);
102+
103+
WriteLoadDeciderStatus writeLoadDeciderStatus;
104+
TimeValue writeLoadDeciderRerouteIntervalSetting;
105+
106+
WriteLoadConstraintSettings(ClusterSettings clusterSettings) {
107+
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled);
108+
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting);
109+
};
110+
111+
private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) {
112+
this.writeLoadDeciderStatus = status;
113+
}
114+
115+
public WriteLoadDeciderStatus getWriteLoadConstraintEnabled() {
116+
return this.writeLoadDeciderStatus;
117+
}
118+
119+
public TimeValue getWriteLoadDeciderRerouteIntervalSetting() {
120+
return this.writeLoadDeciderRerouteIntervalSetting;
121+
}
122+
123+
private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) {
124+
this.writeLoadDeciderRerouteIntervalSetting = timeValue;
125+
}
101126
}

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.elasticsearch.cluster.routing.RerouteService;
6161
import org.elasticsearch.cluster.routing.allocation.AllocationService;
6262
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
63+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor;
6364
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
6465
import org.elasticsearch.cluster.service.ClusterService;
6566
import org.elasticsearch.cluster.version.CompatibilityVersions;
@@ -785,6 +786,15 @@ private void construct(
785786
)::onNewInfo
786787
);
787788

789+
clusterInfoService.addListener(
790+
new WriteLoadConstraintMonitor(
791+
clusterService.getClusterSettings(),
792+
threadPool.relativeTimeInMillisSupplier(),
793+
clusterService::state,
794+
rerouteService
795+
)::onNewInfo
796+
);
797+
788798
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class).toList());
789799
modules.add(indicesModule);
790800

server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
import org.elasticsearch.cluster.node.DiscoveryNode;
2222
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
2323
import org.elasticsearch.cluster.node.DiscoveryNodes;
24+
import org.elasticsearch.cluster.routing.RerouteService;
25+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor;
2426
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
2527
import org.elasticsearch.cluster.service.ClusterApplierService;
2628
import org.elasticsearch.cluster.service.ClusterService;
2729
import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
2830
import org.elasticsearch.cluster.service.MasterService;
31+
import org.elasticsearch.common.Priority;
2932
import org.elasticsearch.common.settings.ClusterSettings;
3033
import org.elasticsearch.common.settings.Settings;
3134
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
@@ -95,6 +98,18 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
9598
mockEstimatedHeapUsageCollector,
9699
mockNodeUsageStatsForThreadPoolsCollector
97100
);
101+
final WriteLoadConstraintMonitor usageMonitor = spy(
102+
new WriteLoadConstraintMonitor(
103+
clusterService.getClusterSettings(),
104+
threadPool.relativeTimeInMillisSupplier(),
105+
clusterService::state,
106+
new RerouteService() {
107+
@Override
108+
public void reroute(String reason, Priority priority, ActionListener<Void> listener) {}
109+
}
110+
)
111+
);
112+
clusterInfoService.addListener(usageMonitor::onNewInfo);
98113
clusterService.addListener(clusterInfoService);
99114
clusterInfoService.addListener(ignored -> {});
100115

@@ -132,13 +147,15 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
132147
for (int i = 0; i < 3; i++) {
133148
Mockito.clearInvocations(mockEstimatedHeapUsageCollector);
134149
Mockito.clearInvocations(mockNodeUsageStatsForThreadPoolsCollector);
150+
Mockito.clearInvocations(usageMonitor);
135151
final int initialRequestCount = client.requestCount;
136152
final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis();
137153
runFor(deterministicTaskQueue, duration);
138154
deterministicTaskQueue.runAllRunnableTasks();
139155
assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval
140156
verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval
141157
verify(mockNodeUsageStatsForThreadPoolsCollector).collectUsageStats(any());
158+
verify(usageMonitor).onNewInfo(any());
142159
}
143160

144161
final AtomicBoolean failMaster2 = new AtomicBoolean();

0 commit comments

Comments
 (0)