Skip to content

Commit 02b7126

Browse files
Collect node thread pool usage for shard balancing
Adds a new transport action to collect usage stats from the data nodes. ClusterInfoService uses the action to pull thread pool usage information from the data nodes to the master node periodically. Sets up a new thread pool usage monitor class to receive new ClusterInfo: in future this class will initiate rebalancing if any data node's write thread pool is hot-spotting. Closes ES-12316
1 parent be4f845 commit 02b7126

File tree

10 files changed

+460
-9
lines changed

10 files changed

+460
-9
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.index.IndexRequest;
1919
import org.elasticsearch.action.search.SearchRequest;
2020
import org.elasticsearch.action.support.IndicesOptions;
21+
import org.elasticsearch.client.internal.Client;
2122
import org.elasticsearch.cluster.ClusterInfoService;
2223
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
2324
import org.elasticsearch.cluster.ClusterState;
@@ -952,7 +953,7 @@ public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPo
952953
}
953954

954955
@Override
955-
public void collectUsageStats(ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener) {
956+
public void collectUsageStats(Client client, ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener) {
956957
ActionListener.completeWith(
957958
listener,
958959
() -> plugin.getClusterService()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.node.usage;
11+
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.FailedNodeException;
14+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
15+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
16+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
17+
import org.elasticsearch.cluster.ClusterName;
18+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
19+
import org.elasticsearch.cluster.node.DiscoveryNode;
20+
import org.elasticsearch.common.io.stream.StreamInput;
21+
import org.elasticsearch.common.io.stream.StreamOutput;
22+
import org.elasticsearch.transport.AbstractTransportRequest;
23+
24+
import java.io.IOException;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Objects;
29+
30+
/**
31+
*
32+
*/
33+
public class NodeUsageStatsForThreadPoolsAction extends ActionType<NodeUsageStatsForThreadPoolsAction.Response> {
34+
public static final NodeUsageStatsForThreadPoolsAction INSTANCE = new NodeUsageStatsForThreadPoolsAction();
35+
public static final String NAME = "internal:monitor/thread_pool/stats";
36+
37+
public NodeUsageStatsForThreadPoolsAction() {
38+
super(NAME);
39+
}
40+
41+
/**
42+
* The request specifying to which data nodes individual {@link NodeRequest} requests should be sent.
43+
*/
44+
public static class Request extends BaseNodesRequest {
45+
public Request() {
46+
super((String[]) null); // send all nodes a request by specifying `null`
47+
}
48+
49+
@Override
50+
public boolean equals(Object o) {
51+
if (this == o) return true;
52+
if (o == null || getClass() != o.getClass()) return false;
53+
return true;
54+
}
55+
56+
@Override
57+
public int hashCode() {
58+
// The class doesn't have any members at the moment so return the same hash code
59+
return Objects.hash(NAME);
60+
}
61+
}
62+
63+
/**
64+
* Request sent to the data nodes. No additional parameters to send in the node-specific request.
65+
*/
66+
public static class NodeRequest extends AbstractTransportRequest {
67+
public NodeRequest(StreamInput in) throws IOException {
68+
super(in);
69+
}
70+
71+
public NodeRequest() {}
72+
}
73+
74+
/**
75+
* The collection of {@link NodeUsageStatsForThreadPools} responses from all the data nodes.
76+
*/
77+
public static class Response extends BaseNodesResponse<NodeResponse> {
78+
79+
protected Response(StreamInput in) throws IOException {
80+
super(in);
81+
}
82+
83+
public Response(
84+
ClusterName clusterName,
85+
List<NodeUsageStatsForThreadPoolsAction.NodeResponse> nodeResponses,
86+
List<FailedNodeException> nodeFailures
87+
) {
88+
super(clusterName, nodeResponses, nodeFailures);
89+
}
90+
91+
/**
92+
* Combines the responses from each node that was called into a single map (by node ID) for the final {@link Response}.
93+
*/
94+
public Map<String, NodeUsageStatsForThreadPools> getAllNodeUsageStatsForThreadPools() {
95+
Map<String, NodeUsageStatsForThreadPools> allNodeUsageStatsForThreadPools = new HashMap<>();
96+
for (NodeUsageStatsForThreadPoolsAction.NodeResponse nodeResponse : getNodes()) {
97+
// NOMERGE: Is the nodeID in NodeUsageStatsForThreadPools redundant? What is it useful for? If not, remove?
98+
allNodeUsageStatsForThreadPools.put(
99+
nodeResponse.getNodeUsageStatsForThreadPools().nodeId(),
100+
nodeResponse.getNodeUsageStatsForThreadPools()
101+
);
102+
}
103+
return allNodeUsageStatsForThreadPools;
104+
}
105+
106+
@Override
107+
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodeResponses) throws IOException {
108+
out.writeCollection(nodeResponses);
109+
}
110+
111+
@Override
112+
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
113+
return in.readCollectionAsList(NodeUsageStatsForThreadPoolsAction.NodeResponse::new);
114+
}
115+
116+
@Override
117+
public String toString() {
118+
return "NodeUsageStatsForThreadPoolsAction.Response{" + "NodeUsageStatsForThreadPoolsAction.NodeResponse=" + getNodes() + "}";
119+
}
120+
}
121+
122+
/**
123+
* The {@link NodeUsageStatsForThreadPools} response from a single data node.
124+
*/
125+
public static class NodeResponse extends BaseNodeResponse {
126+
private final NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools;
127+
128+
protected NodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
129+
super(in, node);
130+
this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in);
131+
}
132+
133+
public NodeResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools) {
134+
super(node);
135+
this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools;
136+
}
137+
138+
public NodeResponse(StreamInput in) throws IOException {
139+
super(in);
140+
this.nodeUsageStatsForThreadPools = new NodeUsageStatsForThreadPools(in);
141+
}
142+
143+
public NodeUsageStatsForThreadPools getNodeUsageStatsForThreadPools() {
144+
return nodeUsageStatsForThreadPools;
145+
}
146+
147+
@Override
148+
public void writeTo(StreamOutput out) throws IOException {
149+
super.writeTo(out);
150+
nodeUsageStatsForThreadPools.writeTo(out);
151+
}
152+
153+
@Override
154+
public boolean equals(Object o) {
155+
if (this == o) return true;
156+
if (o == null || getClass() != o.getClass()) return false;
157+
NodeUsageStatsForThreadPoolsAction.NodeResponse that = (NodeUsageStatsForThreadPoolsAction.NodeResponse) o;
158+
return Objects.equals(this.getNode(), that.getNode())
159+
&& Objects.equals(this.nodeUsageStatsForThreadPools, that.nodeUsageStatsForThreadPools);
160+
}
161+
162+
@Override
163+
public int hashCode() {
164+
return Objects.hash(nodeUsageStatsForThreadPools, getNode());
165+
}
166+
167+
@Override
168+
public String toString() {
169+
return "NodeUsageStatsForThreadPoolsAction.NodeResponse{"
170+
+ "nodeId="
171+
+ getNode().getId()
172+
+ ", nodeUsageStatsForThreadPools="
173+
+ nodeUsageStatsForThreadPools
174+
+ "}";
175+
}
176+
}
177+
178+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.node.usage;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.action.ActionType;
15+
import org.elasticsearch.action.FailedNodeException;
16+
import org.elasticsearch.action.support.ActionFilters;
17+
import org.elasticsearch.action.support.nodes.TransportNodesAction;
18+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
19+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats;
20+
import org.elasticsearch.cluster.node.DiscoveryNode;
21+
import org.elasticsearch.cluster.service.ClusterService;
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.Writeable;
24+
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
25+
import org.elasticsearch.tasks.Task;
26+
import org.elasticsearch.threadpool.ThreadPool;
27+
import org.elasticsearch.transport.TransportService;
28+
29+
import java.io.IOException;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.concurrent.Executor;
34+
35+
/**
36+
* Collects some thread pool stats from each data node for purposes of shard allocation balancing. The specific stats are defined in
37+
* {@link NodeUsageStatsForThreadPools}.
38+
*/
39+
public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesAction<
40+
NodeUsageStatsForThreadPoolsAction.Request,
41+
NodeUsageStatsForThreadPoolsAction.Response,
42+
NodeUsageStatsForThreadPoolsAction.NodeRequest,
43+
NodeUsageStatsForThreadPoolsAction.NodeResponse,
44+
Void> {
45+
46+
private static final Logger logger = LogManager.getLogger(TransportNodeUsageStatsForThreadPoolsAction.class);
47+
48+
public static final ActionType<NodeUsageStatsForThreadPoolsAction.Response> TYPE = new ActionType<>(
49+
NodeUsageStatsForThreadPoolsAction.NAME
50+
);
51+
52+
private final ThreadPool threadPool;
53+
private final ClusterService clusterService;
54+
55+
/**
56+
* @param actionName action name
57+
* @param clusterService cluster service
58+
* @param transportService transport service
59+
* @param actionFilters action filters
60+
* @param nodeRequest node request reader
61+
* @param executor executor to execute node action and final collection
62+
*/
63+
protected TransportNodeUsageStatsForThreadPoolsAction(
64+
String actionName,
65+
ClusterService clusterService,
66+
TransportService transportService,
67+
ActionFilters actionFilters,
68+
Writeable.Reader<NodeUsageStatsForThreadPoolsAction.NodeRequest> nodeRequest,
69+
Executor executor,
70+
ThreadPool threadPool
71+
) {
72+
super(actionName, clusterService, transportService, actionFilters, nodeRequest, executor);
73+
this.threadPool = threadPool;
74+
this.clusterService = clusterService;
75+
}
76+
77+
@Override
78+
protected NodeUsageStatsForThreadPoolsAction.Response newResponse(
79+
NodeUsageStatsForThreadPoolsAction.Request request,
80+
List<NodeUsageStatsForThreadPoolsAction.NodeResponse> nodeResponses,
81+
List<FailedNodeException> nodeFailures
82+
) {
83+
84+
return new NodeUsageStatsForThreadPoolsAction.Response(clusterService.getClusterName(), nodeResponses, nodeFailures);
85+
}
86+
87+
@Override
88+
protected NodeUsageStatsForThreadPoolsAction.NodeRequest newNodeRequest(NodeUsageStatsForThreadPoolsAction.Request request) {
89+
return new NodeUsageStatsForThreadPoolsAction.NodeRequest();
90+
}
91+
92+
@Override
93+
protected NodeUsageStatsForThreadPoolsAction.NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
94+
return new NodeUsageStatsForThreadPoolsAction.NodeResponse(in);
95+
}
96+
97+
@Override
98+
protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
99+
NodeUsageStatsForThreadPoolsAction.NodeRequest request,
100+
Task task
101+
) {
102+
DiscoveryNode localNode = clusterService.localNode();
103+
var writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
104+
assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor;
105+
var trackingForWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor;
106+
107+
ThreadPoolUsageStats threadPoolUsageStats = new ThreadPoolUsageStats(
108+
trackingForWriteExecutor.getMaximumPoolSize(),
109+
(float) trackingForWriteExecutor.pollUtilization(
110+
TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION
111+
),
112+
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset()
113+
);
114+
115+
Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
116+
perThreadPool.put(ThreadPool.Names.WRITE, threadPoolUsageStats);
117+
return new NodeUsageStatsForThreadPoolsAction.NodeResponse(
118+
localNode,
119+
new NodeUsageStatsForThreadPools(ThreadPool.Names.WRITE, perThreadPool)
120+
);
121+
}
122+
}

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writ
269269
}
270270

271271
private void fetchNodesUsageStatsForThreadPools() {
272-
nodeUsageStatsForThreadPoolsCollector.collectUsageStats(ActionListener.releaseAfter(new ActionListener<>() {
272+
nodeUsageStatsForThreadPoolsCollector.collectUsageStats(client, ActionListener.releaseAfter(new ActionListener<>() {
273273
@Override
274274
public void onResponse(Map<String, NodeUsageStatsForThreadPools> writeLoads) {
275275
nodeThreadPoolUsageStatsPerNode = writeLoads;

server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,25 @@
1010
package org.elasticsearch.cluster;
1111

1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.client.internal.Client;
1314

1415
import java.util.Map;
1516

1617
/**
17-
* Collects the usage stats (like write thread pool load) estimations for each node in the cluster.
18+
* Collects the thread pool usage stats for each node in the cluster.
1819
* <p>
1920
* Results are returned as a map of node ID to node usage stats.
2021
*/
2122
public interface NodeUsageStatsForThreadPoolsCollector {
2223
/**
2324
* This will be used when there is no NodeUsageLoadCollector available.
2425
*/
25-
NodeUsageStatsForThreadPoolsCollector EMPTY = listener -> listener.onResponse(Map.of());
26+
NodeUsageStatsForThreadPoolsCollector EMPTY = (client, listener) -> listener.onResponse(Map.of());
2627

2728
/**
28-
* Collects the write load estimates from the cluster.
29+
* Collects the thread pool usage stats ({@link NodeUsageStatsForThreadPools}) for each node in the cluster.
2930
*
30-
* @param listener The listener to receive the write load results.
31+
* @param listener The listener to receive the usage results.
3132
*/
32-
void collectUsageStats(ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener);
33+
void collectUsageStats(Client client, ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener);
3334
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
14+
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
15+
import org.elasticsearch.client.internal.Client;
16+
17+
import java.util.Map;
18+
19+
// TODO: replace the NodeUsageStatsForThreadPoolsCollector interface with this class.
20+
public class NodeUsageStatsForThreadPoolsCollectorImpl implements NodeUsageStatsForThreadPoolsCollector {
21+
22+
@Override
23+
public void collectUsageStats(Client client, ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener) {
24+
client.execute(
25+
TransportNodeUsageStatsForThreadPoolsAction.TYPE,
26+
new NodeUsageStatsForThreadPoolsAction.Request(),
27+
listener.map(response -> response.getAllNodeUsageStatsForThreadPools())
28+
);
29+
}
30+
}

0 commit comments

Comments
 (0)