Skip to content

Clean up awaitClusterState overloads #132529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void testSortAndPaginateWithInProgress() throws Exception {
inProgressSnapshots.add(AbstractSnapshotIntegTestCase.startFullSnapshot(logger, repoName, snapshotName, false));
}
AbstractSnapshotIntegTestCase.awaitNumberOfSnapshotsInProgress(logger, inProgressCount);
AbstractSnapshotIntegTestCase.awaitClusterState(logger, state -> {
AbstractSnapshotIntegTestCase.awaitClusterState(state -> {
final var snapshotsInProgress = SnapshotsInProgress.get(state);
boolean firstIndexSuccessfullySnapshot = snapshotsInProgress.asStream()
.flatMap(s -> s.shards().entrySet().stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public void runRepeatedlyWhileChangingMaster(Runnable runnable) throws Exception

final String nonMasterNode = randomValueOtherThan(masterName, () -> randomFrom(internalCluster().getNodeNames()));
awaitClusterState(
logger,
nonMasterNode,
state -> Optional.ofNullable(state.nodes().getMasterNode()).map(m -> m.getName().equals(masterName) == false).orElse(false)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,6 @@ public void onRequestSent(
final ActionFuture<AcknowledgedResponse> deleteResponse = startDeleteSnapshot(repoName, snapshotName);

awaitClusterState(
logger,
otherDataNode,
state -> SnapshotsInProgress.get(state)
.forRepo(repoName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map<St
);
}

protected void awaitNDeletionsInProgress(int count) throws Exception {
protected void awaitNDeletionsInProgress(int count) {
logger.info("--> wait for [{}] deletions to show up in the cluster state", count);
awaitClusterState(state -> SnapshotDeletionsInProgress.get(state).getEntries().size() == count);
}
Expand All @@ -572,7 +572,6 @@ protected void awaitNoMoreRunningOperations() throws Exception {
protected void awaitNoMoreRunningOperations(String viaNode) throws Exception {
logger.info("--> verify no more operations in the cluster state");
awaitClusterState(
logger,
viaNode,
state -> SnapshotsInProgress.get(state).isEmpty() && SnapshotDeletionsInProgress.get(state).hasDeletionsInProgress() == false
);
Expand Down Expand Up @@ -607,13 +606,13 @@ public static ActionFuture<CreateSnapshotResponse> startFullSnapshot(
.execute();
}

protected void awaitNumberOfSnapshotsInProgress(int count) throws Exception {
protected void awaitNumberOfSnapshotsInProgress(int count) {
awaitNumberOfSnapshotsInProgress(logger, count);
}

public static void awaitNumberOfSnapshotsInProgress(Logger logger, int count) throws Exception {
public static void awaitNumberOfSnapshotsInProgress(Logger logger, int count) {
logger.info("--> wait for [{}] snapshots to show up in the cluster state", count);
awaitClusterState(logger, state -> SnapshotsInProgress.get(state).count() == count);
awaitClusterState(state -> SnapshotsInProgress.get(state).count() == count);
}

protected SnapshotInfo assertSuccessful(ActionFuture<CreateSnapshotResponse> future) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/
package org.elasticsearch.test;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -239,10 +238,8 @@ public static void setAllElapsedMillis(ClusterStatePublicationEvent clusterState
clusterStatePublicationEvent.setMasterApplyElapsedMillis(0L);
}

public static void awaitClusterState(Logger logger, Predicate<ClusterState> statePredicate, ClusterService clusterService)
throws Exception {
final var listener = addTemporaryStateListener(clusterService, statePredicate, ESTestCase.TEST_REQUEST_TIMEOUT);
ESTestCase.safeAwait(listener, ESTestCase.TEST_REQUEST_TIMEOUT);
public static void awaitClusterState(Predicate<ClusterState> statePredicate, ClusterService clusterService) {
ESTestCase.safeAwait(addTemporaryStateListener(clusterService, statePredicate, TimeValue.THIRTY_SECONDS), TimeValue.THIRTY_SECONDS);
}

public static void awaitNoPendingTasks(ClusterService clusterService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.apache.http.HttpHost;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.tests.util.LuceneTestCase;
Expand Down Expand Up @@ -1215,20 +1214,12 @@ public static PendingClusterTasksResponse getClusterPendingTasks(Client client)
}
}

protected void awaitClusterState(Predicate<ClusterState> statePredicate) throws Exception {
awaitClusterState(logger, internalCluster().getMasterName(), statePredicate);
public static void awaitClusterState(Predicate<ClusterState> statePredicate) {
awaitClusterState(internalCluster().getMasterName(), statePredicate);
}

protected void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
ClusterServiceUtils.awaitClusterState(logger, statePredicate, internalCluster().getInstance(ClusterService.class, viaNode));
}

public static void awaitClusterState(Logger logger, Predicate<ClusterState> statePredicate) throws Exception {
awaitClusterState(logger, internalCluster().getMasterName(), statePredicate);
}

public static void awaitClusterState(Logger logger, String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
ClusterServiceUtils.awaitClusterState(logger, statePredicate, internalCluster().getInstance(ClusterService.class, viaNode));
public static void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) {
ClusterServiceUtils.awaitClusterState(statePredicate, internalCluster().getInstance(ClusterService.class, viaNode));
}

public static String getNodeId(String nodeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2064,7 +2064,7 @@ public String getMasterName(@Nullable String viaNode) {
throw new AssertionError("Unable to get master name, no node found");
}
try {
ClusterServiceUtils.awaitClusterState(logger, state -> state.nodes().getMasterNode() != null, clusterService(viaNode));
ClusterServiceUtils.awaitClusterState(state -> state.nodes().getMasterNode() != null, clusterService(viaNode));
final ClusterState state = client(viaNode).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
final DiscoveryNode masterNode = state.nodes().getMasterNode();
if (masterNode == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,6 @@ public void doTestRunPolicyWithFailureToReadPolicy(boolean asyncAction, boolean

// The cluster state can take a few extra milliseconds to update after the steps are executed
ClusterServiceUtils.awaitClusterState(
logger,
s -> s.metadata().getProject(state.projectId()).index(indexMetadata.getIndex()).getLifecycleExecutionState().stepInfo() != null,
clusterService
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,7 @@ public void tearDown() throws Exception {

protected void waitForMlTemplates() throws Exception {
// block until the templates are installed
ClusterServiceUtils.awaitClusterState(
logger,
MachineLearning::criticalTemplatesInstalled,
getInstanceFromNode(ClusterService.class)
);
ClusterServiceUtils.awaitClusterState(MachineLearning::criticalTemplatesInstalled, getInstanceFromNode(ClusterService.class));
}

protected <T> void blockingCall(Consumer<ActionListener<T>> function, AtomicReference<T> response, AtomicReference<Exception> error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
}

@Before
public void ensureTemplatesArePresent() throws Exception {
public void ensureTemplatesArePresent() {
if (cluster().size() > 0) {
awaitClusterState(logger, MachineLearning::criticalTemplatesInstalled);
awaitClusterState(MachineLearning::criticalTemplatesInstalled);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testMultipleNodes() throws Exception {
});
}

private void waitForMonitoringIndices() throws Exception {
private void waitForMonitoringIndices() {
final var indexNameExpressionResolver = internalCluster().getCurrentMasterNodeInstance(IndexNameExpressionResolver.class);
final var indicesOptions = IndicesOptions.builder()
.wildcardOptions(IndicesOptions.WildcardOptions.builder().allowEmptyExpressions(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,7 @@ private static IndexMetadata getIndexMetadata(String indexName) {
.index(indexName);
}

private void waitUntilAllShardsAreUnassigned(Index index) throws Exception {
private void waitUntilAllShardsAreUnassigned(Index index) {
awaitClusterState(state -> state.getRoutingTable().index(index).allPrimaryShardsUnassigned());
}

Expand Down