Skip to content

Simulate impact of shard movement using shard-level write load #131406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

nicktindall
Copy link
Contributor

@nicktindall nicktindall commented Jul 17, 2025

I've been back and forth on this a bit, but I think going for something simple is best. When we start receiving shard write load estimates from the nodes that should be able to plug those in and this should "just work" (assuming I've understood shard-level write load correctly.

We ignore queue latency in the modelling because I don't think we're going to look at it in the decider, and I can't see how we could estimate how it would change in response to shard movements (it's a function of the amount the node is overloaded AND how long it's been like that, and back-pressure should ideally keep a lid on it).

@nicktindall nicktindall added >non-issue :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) labels Jul 17, 2025
@elasticsearchmachine elasticsearchmachine added Team:Distributed Coordination Meta label for Distributed Coordination team v9.2.0 labels Jul 17, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@nicktindall nicktindall requested a review from mhl-b July 17, 2025 06:11

public class WriteLoadPerShardSimulator {

private final ObjectFloatMap<String> writeLoadDeltas;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: simulatedNodesLoad?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I changed to simulatedWriteLoadDeltas, we only store the delta from the reported/original write load here. The idea there is that if no delta is present, we can just return the original NodeUsageStatsForThreadPools instance.

}
}
writeShardsOnNode.forEach(
shardId -> writeLoadPerShard.computeIfAbsent(shardId, k -> new Average()).add(writeUtilisation / writeShardsOnNode.size())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you equally divide write-load across all write shards on node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is just a stop-gap until we get actual shard loads, which should work as a drop-in replacement.

Copy link
Contributor

@mhl-b mhl-b Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I was thinking maybe we should have some heuristic from already available data. Otherwise signal/noise ratio is too high. It's not uncommon to have hundreds of shards, and estimation has little to no impact on a single shard.

For example use shardSize heuristic, the larger size more likely it would have write-load. Lets say linearly increase weight of those shards as size approaches 15GB. And then decrease weight as they approach to 30GB since we would roll-over them (most of the time) if size <15GB then size/15GB else max(0, 1-size/30GB)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll have actual shard write loads shortly. Hopefully we can avoid all this guessing entirely.

#131496

Copy link
Contributor

@mhl-b mhl-b left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@nicktindall
Copy link
Contributor Author

I might hold off merging until we get #131496 merged, I think we can avoid fudging the shard write loads

@nicktindall nicktindall changed the title Estimate impact of shard movement using node-level write load Simulate impact of shard movement using shard-level write load Jul 21, 2025
Copy link
Contributor

@DiannaHohensee DiannaHohensee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left one comment where I'm concerned there might be a bug, but the other requests are just improvements.

We ignore queue latency in the modelling because I don't think we're going to look at it in the decider, and I can't see how we could estimate how it would change in response to shard movements (it's a function of the amount the node is overloaded AND how long it's been like that, and back-pressure should ideally keep a lid on it).

I was originally imagining that we could (in future, not now) collect some per shard stats for queuing, and make some kind of estimate for additional shard write load based on that, like auto-scaling except at the shard instead of node level. But it may turn out that we don't need something like that: probably see how it goes in production. And I haven't explored the feasibility of collecting a stat like that.

Alternatively, we could choose to be more generous with how much write load is moved away from a node, based on the queue latency: we don't know how much load to attribute to a particular shard, but we could extrapolate that when the queue latency is X seconds, we then need to move off X seconds of additional thread execution time translated into shard write load (which is thread execution time).


public class WriteLoadPerShardSimulator {

private final ObjectDoubleMap<String> simulatedWriteLoadDeltas;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a performance gain over Map<String, Double>? I'm wondering why use it, essentially.

Copy link
Contributor

@mhl-b mhl-b Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Double is a boxed value, a pointer to heap allocated double. https://docs.oracle.com/javase/tutorial/java/data/autoboxing.html. Unfortunately. For this reason you can see different classes working with generic(boxed) and primitive collections, for performance reasons. Boxed values are tracked by GC, take more space (pointer and value), and require dereference. And can be null :(

IntStream s; -> stream of int rather than Integer
LongStream s; -> stream of long
Arrays.binarySearch(); -> 17 method overrides for each primitive and generic(boxed)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://openjdk.org/jeps/402 - somewhere in future, when Brian Goetz celebrates his 80th birthday I guess :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they can somehow evolve their way to "valhalla" without breaking backward compatibility and without making the language horribly inconsistent that'll be a marvel of modern engineering.

.flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum)))
.collect(Collectors.toUnmodifiableMap(shardId -> shardId, shardId -> randomDoubleBetween(0.1, 5.0, true)))
)
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you log the ClusterInfo to a string? There isn't any debug information to look at if any of the tests fail (I think?), and some logging of the values might help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that will add value here, there's a lot of numbers that go into the calculations and all the values are randomised, and it's a unit test with no concurrency so failures should be reliably reproducible with the seed. I would like to leave the logging to whoever's troubleshooting the failure.

final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);

// Relocate a random shard from node_0 to node_1
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log randomShard? For debug purposes, then we can match it with the ClusterInfo info I suggest logging in createRoutingAllocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again given that this is a unit test with no concurrency any failure should be reliably reproducible. Going to not log here assuming someone investigating a failure can log the things they're interested in.


public void simulateShardStarted(ShardRouting shardRouting) {
final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId());
if (writeLoadForShard != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you test the case where a shard write load is 0/null? Like would be reported for a non-data stream index shard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be covered by an existing test, I renamed it to make that clearer in 58e84a2.

That test randomly nullifies the thread pool stats or the write load stats for the objects in the test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also increased the likelihood of the utilisation numbers and write loads of being zeros in a072aaf

We will not return a simulated utilisation of < 0.0 as well because that's nonsense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nice change to max(X, 0.0) to avoid the negative numbers

Seems possible for the node-level and shard-level write loads not to line up exactly. E.g. the latest node-level write load reported for nodeA is 0, and it holds a shardA with peak shard write load is >0. If we then move shardA away from nodeA, we'd go negative. Shards can be moved for reasons other than write load balancing, so maybe it could happen 🧐

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that's something we'll have to live with, I don't think it should matter too much for the purpose of the simulation.

Copy link
Contributor

@DiannaHohensee DiannaHohensee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay -- I didn't quite manage to finish reviewing on Friday.

LGTM 👍 Just some minor comments. Nice to have this piece :)

Maps.copyMapWithAddedOrReplacedEntry(
entry.getValue().threadPoolUsageStatsMap(),
"write",
replaceWritePoolStats(entry.getValue(), simulatedWriteLoadDeltas.get(entry.getKey()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
replaceWritePoolStats(entry.getValue(), simulatedWriteLoadDeltas.get(entry.getKey()))
replaceWritePoolStats(entry.getValue(), simulatedNodeWriteLoadDeltas.get(entry.getKey()))

private final Map<ShardId, Double> writeLoadsPerShard;

public ShardMovementWriteLoadSimulator(RoutingAllocation routingAllocation) {
this.originalNodeUsageStatsForThreadPools = routingAllocation.clusterInfo().getNodeUsageStatsForThreadPools();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a copy were made here, there would be no need to generate the node stats on demand. The original stats also aren't needed ever again in the original form.

The only reason to keep it I can think of would be stats reporting down the line -- e.g. what did this balancer round accomplish.

Anyway, more of a note than a request. Doesn't seem like a big deal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the originals + deltas so that we could just return the unmodified NodeUsageStatsForThreadPools for any nodes that weren't involved in shard movement. I think this should keep garbage to a minimum? Also if we keep adding and subtracting the write load values we might introduce rounding errors due to the arithmetic.

@nicktindall nicktindall merged commit ab2e654 into elastic:main Jul 29, 2025
33 checks passed
@nicktindall nicktindall deleted the ES-12000_add_write_load_modeling_to_balancer branch July 29, 2025 04:39
szybia added a commit to szybia/elasticsearch that referenced this pull request Jul 29, 2025
…-tracking

* upstream/main: (26 commits)
  Add release notes for v9.1.0 release (elastic#131953)
  Unmute multi_node generative tests (elastic#132021)
  Avoid re-enqueueing merge tasks (elastic#132020)
  Fix file entitlements for shared data dir (elastic#131748)
  ES|QL brute force l2_norm vector function (elastic#132025)
  Make ES|QL SAMPLE not a pipeline breaker (elastic#132014)
  Speed up tail computation in MemorySegmentES91OSQVectorsScorer (elastic#132001)
  Remove deprecated usages in `TransportPutFollowAction` (elastic#132038)
  Simulate impact of shard movement using shard-level write load (elastic#131406)
  Remove RemoteClusterService.getConnections() method (elastic#131948)
  Fix off by one in ValuesBytesRefAggregator (elastic#132032)
  Use unicode strings in data generation by default (elastic#132028)
  Adding index.refresh_interval as a data stream setting (elastic#131482)
  [ES|QL] Add more Min/MaxOverTime CSV tests (elastic#131070)
  Restrict remote ENRICH after FORK (elastic#131945)
  Fix decoding of non-ascii field names in ignored source (elastic#132018)
  [docs] Use centrally maintained version variables (elastic#131939)
  Configurable Inference timeout during Query time (elastic#131551)
  ESQL: Allow pruning columns added by InlineJoin (elastic#131204)
  ESQL: Fail `profile` on text response formats (elastic#128627)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) >non-issue Team:Distributed Coordination Meta label for Distributed Coordination team v9.2.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants