28
28
import org .elasticsearch .common .settings .ClusterSettings ;
29
29
import org .elasticsearch .common .settings .Settings ;
30
30
import org .elasticsearch .common .transport .TransportAddress ;
31
+ import org .elasticsearch .common .util .set .Sets ;
31
32
import org .elasticsearch .core .TimeValue ;
32
33
import org .elasticsearch .index .Index ;
33
34
import org .elasticsearch .index .IndexSettings ;
34
35
import org .elasticsearch .index .IndexVersion ;
36
+ import org .elasticsearch .test .ClusterServiceUtils ;
35
37
import org .elasticsearch .test .ESTestCase ;
36
38
import org .elasticsearch .test .NodeRoles ;
39
+ import org .elasticsearch .test .hamcrest .ElasticsearchAssertions ;
37
40
import org .elasticsearch .threadpool .TestThreadPool ;
38
41
import org .elasticsearch .threadpool .ThreadPool ;
39
42
import org .elasticsearch .xpack .core .ilm .CheckShrinkReadyStep ;
@@ -348,7 +351,6 @@ public void testExceptionStillProcessesOtherIndicesOnMaster() {
348
351
doTestExceptionStillProcessesOtherIndices (true );
349
352
}
350
353
351
- @ SuppressWarnings ("unchecked" )
352
354
public void doTestExceptionStillProcessesOtherIndices (boolean useOnMaster ) {
353
355
String policy1 = randomAlphaOfLengthBetween (1 , 20 );
354
356
Step .StepKey i1currentStepKey = randomStepKey ();
@@ -377,7 +379,7 @@ public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
377
379
}
378
380
MockAction mockAction = new MockAction (List .of (i2mockStep ));
379
381
Phase i2phase = new Phase ("phase" , TimeValue .ZERO , Map .of ("action" , mockAction ));
380
- LifecyclePolicy i2policy = newTestLifecyclePolicy (policy1 , Map .of (i2phase .getName (), i1phase ));
382
+ LifecyclePolicy i2policy = newTestLifecyclePolicy (policy2 , Map .of (i2phase .getName (), i2phase ));
381
383
Index index2 = new Index (
382
384
randomValueOtherThan (index1 .getName (), () -> randomAlphaOfLengthBetween (1 , 20 )),
383
385
randomAlphaOfLengthBetween (1 , 20 )
@@ -403,8 +405,8 @@ public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
403
405
((IndexLifecycleRunnerTests .MockClusterStateActionStep ) i1mockStep ).setException (
404
406
failStep1 ? new IllegalArgumentException ("forcing a failure for index 1" ) : null
405
407
);
406
- ((IndexLifecycleRunnerTests .MockClusterStateActionStep ) i1mockStep ).setLatch (stepLatch );
407
- ((IndexLifecycleRunnerTests .MockClusterStateActionStep ) i1mockStep ).setException (
408
+ ((IndexLifecycleRunnerTests .MockClusterStateActionStep ) i2mockStep ).setLatch (stepLatch );
409
+ ((IndexLifecycleRunnerTests .MockClusterStateActionStep ) i2mockStep ).setException (
408
410
failStep1 ? null : new IllegalArgumentException ("forcing a failure for index 2" )
409
411
);
410
412
}
@@ -420,7 +422,7 @@ public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
420
422
.numberOfReplicas (randomIntBetween (0 , 5 ))
421
423
.build ();
422
424
IndexMetadata i2indexMetadata = IndexMetadata .builder (index2 .getName ())
423
- .settings (settings (IndexVersion .current ()).put (LifecycleSettings .LIFECYCLE_NAME , policy1 ))
425
+ .settings (settings (IndexVersion .current ()).put (LifecycleSettings .LIFECYCLE_NAME , policy2 ))
424
426
.putCustom (ILM_CUSTOM_METADATA_KEY , i2lifecycleState .build ().asMap ())
425
427
.numberOfShards (randomIntBetween (1 , 5 ))
426
428
.numberOfReplicas (randomIntBetween (0 , 5 ))
@@ -433,23 +435,46 @@ public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
433
435
.persistentSettings (settings (IndexVersion .current ()).build ())
434
436
.build ();
435
437
438
+ Settings settings = Settings .builder ().put (LifecycleSettings .LIFECYCLE_POLL_INTERVAL , "1s" ).build ();
439
+ var clusterSettings = new ClusterSettings (
440
+ settings ,
441
+ Sets .union (ClusterSettings .BUILT_IN_CLUSTER_SETTINGS , Set .of (LifecycleSettings .LIFECYCLE_POLL_INTERVAL_SETTING ))
442
+ );
443
+ ClusterService clusterService = ClusterServiceUtils .createClusterService (threadPool , clusterSettings );
444
+ DiscoveryNode node = clusterService .localNode ();
436
445
ClusterState currentState = ClusterState .builder (ClusterName .DEFAULT )
437
446
.metadata (metadata )
438
- .nodes (DiscoveryNodes .builder ().localNodeId ( nodeId ).masterNodeId (nodeId ). add ( masterNode ). build ( ))
447
+ .nodes (DiscoveryNodes .builder ().add ( node ).masterNodeId (node . getId ()). localNodeId ( node . getId () ))
439
448
.build ();
449
+ ClusterServiceUtils .setState (clusterService , currentState );
450
+
451
+ indexLifecycleService = new IndexLifecycleService (
452
+ Settings .EMPTY ,
453
+ mock (Client .class ),
454
+ clusterService ,
455
+ threadPool ,
456
+ systemUTC (),
457
+ () -> now ,
458
+ null ,
459
+ null ,
460
+ null
461
+ );
440
462
463
+ ClusterChangedEvent event = new ClusterChangedEvent ("_source" , currentState , ClusterState .EMPTY_STATE );
464
+ indexLifecycleService .applyClusterState (event );
441
465
if (useOnMaster ) {
442
- when (clusterService .state ()).thenReturn (currentState );
443
466
indexLifecycleService .onMaster (currentState );
444
467
} else {
445
- indexLifecycleService .triggerPolicies (currentState , randomBoolean ());
468
+ // TODO: this relies on the master task queue
469
+ indexLifecycleService .triggerPolicies (currentState , true );
446
470
}
447
471
try {
448
- stepLatch . await ( 5 , TimeUnit .SECONDS );
472
+ ElasticsearchAssertions . awaitLatch ( stepLatch , 5 , TimeUnit .SECONDS );
449
473
} catch (InterruptedException e ) {
450
474
logger .error ("failure while waiting for step execution" , e );
451
475
fail ("both steps should have been executed, even with an exception" );
452
476
}
477
+ clusterService .close ();
453
478
}
454
479
455
480
public void testClusterChangedWaitsForTheStateToBeRecovered () {
0 commit comments