@@ -98,36 +98,7 @@ func (c *changeCache) updateStats(ctx context.Context) {
98
98
c .db .DbStats .Cache ().SkippedSeqCap .Set (skippedSequenceListStats .ListCapacityStat )
99
99
}
100
100
101
- type LogEntry channels.LogEntry
102
-
103
- func (l LogEntry ) String () string {
104
- return channels .LogEntry (l ).String ()
105
- }
106
-
107
- func (entry * LogEntry ) IsRemoved () bool {
108
- return entry .Flags & channels .Removed != 0
109
- }
110
-
111
- func (entry * LogEntry ) IsDeleted () bool {
112
- return entry .Flags & channels .Deleted != 0
113
- }
114
-
115
- // Returns false if the entry is either a removal or a delete
116
- func (entry * LogEntry ) IsActive () bool {
117
- return ! entry .IsRemoved () && ! entry .IsDeleted ()
118
- }
119
-
120
- func (entry * LogEntry ) SetRemoved () {
121
- entry .Flags |= channels .Removed
122
- }
123
-
124
- func (entry * LogEntry ) SetDeleted () {
125
- entry .Flags |= channels .Deleted
126
- }
127
-
128
- func (entry * LogEntry ) IsUnusedRange () bool {
129
- return entry .UnusedSequence && entry .EndSequence > 0
130
- }
101
+ type LogEntry = channels.LogEntry
131
102
132
103
type LogEntries []* LogEntry
133
104
@@ -332,19 +303,20 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
332
303
docJSON := event .Value
333
304
changedChannelsCombined := channels.Set {}
334
305
306
+ timeReceived := channels .NewFeedTimestamp (& event .TimeReceived )
335
307
// ** This method does not directly access any state of c, so it doesn't lock.
336
308
// Is this a user/role doc for this database?
337
309
if strings .HasPrefix (docID , c .metaKeys .UserKeyPrefix ()) {
338
- c .processPrincipalDoc (ctx , docID , docJSON , true , event . TimeReceived )
310
+ c .processPrincipalDoc (ctx , docID , docJSON , true , timeReceived )
339
311
return
340
312
} else if strings .HasPrefix (docID , c .metaKeys .RoleKeyPrefix ()) {
341
- c .processPrincipalDoc (ctx , docID , docJSON , false , event . TimeReceived )
313
+ c .processPrincipalDoc (ctx , docID , docJSON , false , timeReceived )
342
314
return
343
315
}
344
316
345
317
// Is this an unused sequence notification?
346
318
if strings .HasPrefix (docID , c .metaKeys .UnusedSeqPrefix ()) {
347
- c .processUnusedSequence (ctx , docID , event . TimeReceived )
319
+ c .processUnusedSequence (ctx , docID , timeReceived )
348
320
return
349
321
}
350
322
if strings .HasPrefix (docID , c .metaKeys .UnusedSeqRangePrefix ()) {
@@ -454,7 +426,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
454
426
base .InfofCtx (ctx , base .KeyCache , "Received unused #%d in unused_sequences property for (%q / %q)" , seq , base .UD (docID ), syncData .CurrentRev )
455
427
change := & LogEntry {
456
428
Sequence : seq ,
457
- TimeReceived : event . TimeReceived ,
429
+ TimeReceived : timeReceived ,
458
430
CollectionID : event .CollectionID ,
459
431
}
460
432
changedChannels := c .processEntry (ctx , change )
@@ -479,7 +451,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
479
451
base .InfofCtx (ctx , base .KeyCache , "Received deduplicated #%d in recent_sequences property for (%q / %q)" , seq , base .UD (docID ), syncData .CurrentRev )
480
452
change := & LogEntry {
481
453
Sequence : seq ,
482
- TimeReceived : event . TimeReceived ,
454
+ TimeReceived : timeReceived ,
483
455
CollectionID : event .CollectionID ,
484
456
}
485
457
@@ -506,8 +478,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
506
478
DocID : docID ,
507
479
RevID : syncData .CurrentRev ,
508
480
Flags : syncData .Flags ,
509
- TimeReceived : event .TimeReceived ,
510
- TimeSaved : syncData .TimeSaved ,
481
+ TimeReceived : timeReceived ,
511
482
Channels : syncData .Channels ,
512
483
CollectionID : event .CollectionID ,
513
484
}
@@ -549,7 +520,7 @@ func (c *changeCache) unmarshalCachePrincipal(docJSON []byte) (cachePrincipal, e
549
520
}
550
521
551
522
// Process unused sequence notification. Extracts sequence from docID and sends to cache for buffering
552
- func (c * changeCache ) processUnusedSequence (ctx context.Context , docID string , timeReceived time. Time ) {
523
+ func (c * changeCache ) processUnusedSequence (ctx context.Context , docID string , timeReceived channels. FeedTimestamp ) {
553
524
sequenceStr := strings .TrimPrefix (docID , c .metaKeys .UnusedSeqPrefix ())
554
525
sequence , err := strconv .ParseUint (sequenceStr , 10 , 64 )
555
526
if err != nil {
@@ -560,7 +531,7 @@ func (c *changeCache) processUnusedSequence(ctx context.Context, docID string, t
560
531
561
532
}
562
533
563
- func (c * changeCache ) releaseUnusedSequence (ctx context.Context , sequence uint64 , timeReceived time. Time ) {
534
+ func (c * changeCache ) releaseUnusedSequence (ctx context.Context , sequence uint64 , timeReceived channels. FeedTimestamp ) {
564
535
change := & LogEntry {
565
536
Sequence : sequence ,
566
537
TimeReceived : timeReceived ,
@@ -583,7 +554,7 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64
583
554
584
555
// releaseUnusedSequenceRange will handle unused sequence range arriving over DCP. It will batch remove from skipped or
585
556
// push a range to pending sequences, or both.
586
- func (c * changeCache ) releaseUnusedSequenceRange (ctx context.Context , fromSequence uint64 , toSequence uint64 , timeReceived time. Time ) {
557
+ func (c * changeCache ) releaseUnusedSequenceRange (ctx context.Context , fromSequence uint64 , toSequence uint64 , timeReceived channels. FeedTimestamp ) {
587
558
588
559
base .InfofCtx (ctx , base .KeyCache , "Received #%d-#%d (unused sequence range)" , fromSequence , toSequence )
589
560
@@ -613,7 +584,7 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
613
584
}
614
585
615
586
// processUnusedRange handles pushing unused range to pending or skipped lists
616
- func (c * changeCache ) processUnusedRange (ctx context.Context , fromSequence , toSequence uint64 , allChangedChannels channels.Set , timeReceived time. Time ) channels.Set {
587
+ func (c * changeCache ) processUnusedRange (ctx context.Context , fromSequence , toSequence uint64 , allChangedChannels channels.Set , timeReceived channels. FeedTimestamp ) channels.Set {
617
588
c .lock .Lock ()
618
589
defer c .lock .Unlock ()
619
590
@@ -622,7 +593,7 @@ func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSe
622
593
c .skippedSeqs .processUnusedSequenceRangeAtSkipped (ctx , fromSequence , toSequence )
623
594
} else if fromSequence >= c .nextSequence {
624
595
// whole range to pending
625
- c ._pushRangeToPending (ctx , fromSequence , toSequence , timeReceived )
596
+ c ._pushRangeToPending (fromSequence , toSequence , timeReceived )
626
597
// unblock any pending sequences we can after new range(s) have been pushed to pending
627
598
changedChannels := c ._addPendingLogs (ctx )
628
599
allChangedChannels = allChangedChannels .Update (changedChannels )
@@ -640,7 +611,7 @@ func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSe
640
611
}
641
612
642
613
// _pushRangeToPending will push an unused sequence range to pendingLogs
643
- func (c * changeCache ) _pushRangeToPending (ctx context. Context , startSeq , endSeq uint64 , timeReceived time. Time ) {
614
+ func (c * changeCache ) _pushRangeToPending (startSeq , endSeq uint64 , timeReceived channels. FeedTimestamp ) {
644
615
645
616
entry := & LogEntry {
646
617
TimeReceived : timeReceived ,
@@ -672,10 +643,10 @@ func (c *changeCache) processUnusedSequenceRange(ctx context.Context, docID stri
672
643
return
673
644
}
674
645
675
- c .releaseUnusedSequenceRange (ctx , fromSequence , toSequence , time . Now ())
646
+ c .releaseUnusedSequenceRange (ctx , fromSequence , toSequence , channels . NewFeedTimestampFromNow ())
676
647
}
677
648
678
- func (c * changeCache ) processPrincipalDoc (ctx context.Context , docID string , docJSON []byte , isUser bool , timeReceived time. Time ) {
649
+ func (c * changeCache ) processPrincipalDoc (ctx context.Context , docID string , docJSON []byte , isUser bool , timeReceived channels. FeedTimestamp ) {
679
650
680
651
// Currently the cache isn't really doing much with user docs; mostly it needs to know about
681
652
// them because they have sequence numbers, so without them the sequence of sequences would
@@ -818,9 +789,9 @@ func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) channel
818
789
base .DebugfCtx (ctx , base .KeyChanges , " #%d ==> channels %v" , change .Sequence , base .UD (updatedChannels ))
819
790
}
820
791
821
- if ! change .TimeReceived . IsZero () {
792
+ if change .TimeReceived != 0 {
822
793
c .db .DbStats .Database ().DCPCachingCount .Add (1 )
823
- c .db .DbStats .Database ().DCPCachingTime .Add (time . Since ( change .TimeReceived ). Nanoseconds ())
794
+ c .db .DbStats .Database ().DCPCachingTime .Add (change .TimeReceived . Since ())
824
795
}
825
796
826
797
return updatedChannels
@@ -849,7 +820,7 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
849
820
if oldestPending .IsUnusedRange () && oldestPending .EndSequence >= c .nextSequence {
850
821
c .nextSequence = oldestPending .EndSequence + 1
851
822
}
852
- } else if len (c .pendingLogs ) > c .options .CachePendingSeqMaxNum || time . Since ( c .pendingLogs [0 ].TimeReceived ) >= c .options .CachePendingSeqMaxWait {
823
+ } else if len (c .pendingLogs ) > c .options .CachePendingSeqMaxNum || c .pendingLogs [0 ].TimeReceived . OlderOrEqual ( c .options .CachePendingSeqMaxWait ) {
853
824
// Skip all sequences up to the oldest Pending
854
825
c .PushSkipped (ctx , c .nextSequence , oldestPending .Sequence - 1 )
855
826
c .nextSequence = oldestPending .Sequence
0 commit comments