@@ -2576,6 +2576,191 @@ func TestReleasedSequenceRangeHandlingEdgeCase1(t *testing.T) {
2576
2576
}, time .Second * 10 , time .Millisecond * 100 )
2577
2577
}
2578
2578
2579
+ // TestReleasedSequenceRangeHandlingEdgeCase2:
2580
+ // - Test releasing unused sequence range with everything startSeq to endSeq - 1 in skipped
2581
+ // - Test the handling at the cache when endSeq on unused sequence range is in the pending list
2582
+ // - This test is lower level version of TestJumpInSequencesAtAllocatorSkippedSequenceFill
2583
+ func TestReleasedSequenceRangeHandlingEdgeCase2 (t * testing.T ) {
2584
+ base .SetUpTestLogging (t , base .LevelDebug , base .KeyCache )
2585
+
2586
+ ctx := base .TestCtx (t )
2587
+ bucket := base .GetTestBucket (t )
2588
+ dbContext , err := NewDatabaseContext (ctx , "db" , bucket , false , DatabaseContextOptions {
2589
+ Scopes : GetScopesOptions (t , bucket , 1 ),
2590
+ })
2591
+ require .NoError (t , err )
2592
+ defer dbContext .Close (ctx )
2593
+
2594
+ ctx = dbContext .AddDatabaseLogContext (ctx )
2595
+ err = dbContext .StartOnlineProcesses (ctx )
2596
+ require .NoError (t , err )
2597
+
2598
+ testChangeCache := & changeCache {}
2599
+ if err := testChangeCache .Init (ctx , dbContext , dbContext .channelCache , nil , & CacheOptions {
2600
+ CachePendingSeqMaxWait : 100 * time .Millisecond ,
2601
+ CacheSkippedSeqMaxWait : 20 * time .Minute ,
2602
+ CachePendingSeqMaxNum : 1 ,
2603
+ }, dbContext .MetadataKeys ); err != nil {
2604
+ log .Printf ("Init failed for testChangeCache: %v" , err )
2605
+ t .Fail ()
2606
+ }
2607
+
2608
+ if err := testChangeCache .Start (0 ); err != nil {
2609
+ log .Printf ("Start error for testChangeCache: %v" , err )
2610
+ t .Fail ()
2611
+ }
2612
+ defer testChangeCache .Stop (ctx )
2613
+ require .NoError (t , err )
2614
+
2615
+ // process change that should push pending and subsequently be cached
2616
+ entry := & LogEntry {
2617
+ Sequence : 20 ,
2618
+ DocID : fmt .Sprintf ("doc_%d" , 50 ),
2619
+ RevID : "1-abcdefabcdefabcdef" ,
2620
+ TimeReceived : channels .NewFeedTimestampFromNow (),
2621
+ }
2622
+ _ = testChangeCache .processEntry (ctx , entry )
2623
+
2624
+ // assert that the skipped list is filled
2625
+ require .EventuallyWithT (t , func (c * assert.CollectT ) {
2626
+ testChangeCache .updateStats (ctx )
2627
+ assert .Equal (c , int64 (1 ), dbContext .DbStats .CacheStats .SkippedSeqLen .Value ())
2628
+ assert .Equal (c , int64 (19 ), dbContext .DbStats .CacheStats .NumCurrentSeqsSkipped .Value ())
2629
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .PendingSeqLen .Value ())
2630
+ dbContext .UpdateCalculatedStats (ctx )
2631
+ assert .Equal (c , int64 (20 ), dbContext .DbStats .CacheStats .HighSeqCached .Value ())
2632
+ }, time .Second * 10 , time .Millisecond * 100 )
2633
+
2634
+ // process unusedSeq range with pending seq equal to end
2635
+ testChangeCache .releaseUnusedSequenceRange (ctx , 1 , 20 , channels .NewFeedTimestampFromNow ())
2636
+
2637
+ // assert that the pending list is empty + high seq cached + next seq is as expected
2638
+ require .EventuallyWithT (t , func (c * assert.CollectT ) {
2639
+ testChangeCache .updateStats (ctx )
2640
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .PendingSeqLen .Value ())
2641
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .SkippedSeqLen .Value ())
2642
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .NumCurrentSeqsSkipped .Value ())
2643
+ assert .Equal (c , uint64 (21 ), testChangeCache .nextSequence )
2644
+ dbContext .UpdateCalculatedStats (ctx )
2645
+ assert .Equal (c , int64 (20 ), dbContext .DbStats .CacheStats .HighSeqCached .Value ())
2646
+ }, time .Second * 10 , time .Millisecond * 100 )
2647
+ }
2648
+
2649
+ // TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped:
2650
+ // - Test releasing unused sequence range that has duplicate sequences in skipped sequence list
2651
+ // - Assert sequences in the range are removed from skipped
2652
+ // - Empty skipped through unused sequence release
2653
+ // - Add new contiguous sequence and assert it is processed correctly
2654
+ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped (t * testing.T ) {
2655
+ base .SetUpTestLogging (t , base .LevelDebug , base .KeyCache )
2656
+
2657
+ ctx := base .TestCtx (t )
2658
+ bucket := base .GetTestBucket (t )
2659
+ dbContext , err := NewDatabaseContext (ctx , "db" , bucket , false , DatabaseContextOptions {
2660
+ Scopes : GetScopesOptions (t , bucket , 1 ),
2661
+ })
2662
+ require .NoError (t , err )
2663
+ defer dbContext .Close (ctx )
2664
+
2665
+ ctx = dbContext .AddDatabaseLogContext (ctx )
2666
+ err = dbContext .StartOnlineProcesses (ctx )
2667
+ require .NoError (t , err )
2668
+
2669
+ testChangeCache := & changeCache {}
2670
+ if err := testChangeCache .Init (ctx , dbContext , dbContext .channelCache , nil , & CacheOptions {
2671
+ CachePendingSeqMaxWait : 20 * time .Minute ,
2672
+ CacheSkippedSeqMaxWait : 20 * time .Minute ,
2673
+ CachePendingSeqMaxNum : 0 ,
2674
+ }, dbContext .MetadataKeys ); err != nil {
2675
+ log .Printf ("Init failed for testChangeCache: %v" , err )
2676
+ t .Fail ()
2677
+ }
2678
+
2679
+ if err := testChangeCache .Start (0 ); err != nil {
2680
+ log .Printf ("Start error for testChangeCache: %v" , err )
2681
+ t .Fail ()
2682
+ }
2683
+ defer testChangeCache .Stop (ctx )
2684
+ require .NoError (t , err )
2685
+
2686
+ // push two entries that will be pushed to pending and subsequently skipped will be filled with sequence gaps
2687
+ entry := & LogEntry {
2688
+ Sequence : 14 ,
2689
+ DocID : fmt .Sprintf ("doc_%d" , 50 ),
2690
+ RevID : "1-abcdefabcdefabcdef" ,
2691
+ TimeReceived : channels .NewFeedTimestampFromNow (),
2692
+ }
2693
+ _ = testChangeCache .processEntry (ctx , entry )
2694
+
2695
+ entry = & LogEntry {
2696
+ Sequence : 18 ,
2697
+ DocID : fmt .Sprintf ("doc_%d" , 50 ),
2698
+ RevID : "1-abcdefabcdefabcdef" ,
2699
+ TimeReceived : channels .NewFeedTimestampFromNow (),
2700
+ }
2701
+ _ = testChangeCache .processEntry (ctx , entry )
2702
+
2703
+ // assert skipped is filled
2704
+ require .EventuallyWithT (t , func (c * assert.CollectT ) {
2705
+ testChangeCache .updateStats (ctx )
2706
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .PendingSeqLen .Value ())
2707
+ assert .Equal (c , int64 (2 ), dbContext .DbStats .CacheStats .SkippedSeqLen .Value ())
2708
+ assert .Equal (c , int64 (16 ), dbContext .DbStats .CacheStats .NumCurrentSeqsSkipped .Value ())
2709
+ assert .Equal (c , uint64 (19 ), testChangeCache .nextSequence )
2710
+ dbContext .UpdateCalculatedStats (ctx )
2711
+ assert .Equal (c , int64 (18 ), dbContext .DbStats .CacheStats .HighSeqCached .Value ())
2712
+ }, time .Second * 10 , time .Millisecond * 100 )
2713
+
2714
+ // process unusedSeq range with range containing duplicate sipped sequences
2715
+ // Skipped should contain: (1-13), (15-17) before processing this range
2716
+ testChangeCache .releaseUnusedSequenceRange (ctx , 10 , 17 , channels .NewFeedTimestampFromNow ())
2717
+
2718
+ // assert skipped list altered to reflect the above range is processed
2719
+ require .EventuallyWithT (t , func (c * assert.CollectT ) {
2720
+ testChangeCache .updateStats (ctx )
2721
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .PendingSeqLen .Value ())
2722
+ assert .Equal (c , int64 (1 ), dbContext .DbStats .CacheStats .SkippedSeqLen .Value ())
2723
+ assert .Equal (c , int64 (9 ), dbContext .DbStats .CacheStats .NumCurrentSeqsSkipped .Value ())
2724
+ assert .Equal (c , uint64 (19 ), testChangeCache .nextSequence )
2725
+ dbContext .UpdateCalculatedStats (ctx )
2726
+ assert .Equal (c , int64 (18 ), dbContext .DbStats .CacheStats .HighSeqCached .Value ())
2727
+ }, time .Second * 10 , time .Millisecond * 100 )
2728
+
2729
+ // Skipped should contain: (1-9) before processing this range
2730
+ testChangeCache .releaseUnusedSequenceRange (ctx , 1 , 9 , channels .NewFeedTimestampFromNow ())
2731
+
2732
+ // assert skipped list is emptied after the above range is processed
2733
+ require .EventuallyWithT (t , func (c * assert.CollectT ) {
2734
+ testChangeCache .updateStats (ctx )
2735
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .PendingSeqLen .Value ())
2736
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .SkippedSeqLen .Value ())
2737
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .NumCurrentSeqsSkipped .Value ())
2738
+ assert .Equal (c , uint64 (19 ), testChangeCache .nextSequence )
2739
+ dbContext .UpdateCalculatedStats (ctx )
2740
+ assert .Equal (c , int64 (18 ), dbContext .DbStats .CacheStats .HighSeqCached .Value ())
2741
+ }, time .Second * 10 , time .Millisecond * 100 )
2742
+
2743
+ // assert a new contiguous sequence is processed correctly
2744
+ entry = & LogEntry {
2745
+ Sequence : 19 ,
2746
+ DocID : fmt .Sprintf ("doc_%d" , 50 ),
2747
+ RevID : "1-abcdefabcdefabcdef" ,
2748
+ TimeReceived : channels .NewFeedTimestampFromNow (),
2749
+ }
2750
+ _ = testChangeCache .processEntry (ctx , entry )
2751
+
2752
+ // assert on stats
2753
+ require .EventuallyWithT (t , func (c * assert.CollectT ) {
2754
+ testChangeCache .updateStats (ctx )
2755
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .PendingSeqLen .Value ())
2756
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .SkippedSeqLen .Value ())
2757
+ assert .Equal (c , int64 (0 ), dbContext .DbStats .CacheStats .NumCurrentSeqsSkipped .Value ())
2758
+ assert .Equal (c , uint64 (20 ), testChangeCache .nextSequence )
2759
+ dbContext .UpdateCalculatedStats (ctx )
2760
+ assert .Equal (c , int64 (19 ), dbContext .DbStats .CacheStats .HighSeqCached .Value ())
2761
+ }, time .Second * 10 , time .Millisecond * 100 )
2762
+ }
2763
+
2579
2764
// getChanges is a synchronous convenience function that returns all changes as a simple array. This will fail the test if an error is returned.
2580
2765
func getChanges (t * testing.T , collection * DatabaseCollectionWithUser , channels base.Set , options ChangesOptions ) []* ChangeEntry {
2581
2766
require .NotNil (t , options .ChangesCtx )
0 commit comments