Skip to content

Commit 288ec8c

Browse files
fix: fix timeout and series progress marker for same requests with different shards (#17125)
1 parent cc50361 commit 288ec8c

File tree

2 files changed

+53
-13
lines changed

2 files changed

+53
-13
lines changed

pkg/compactor/deletion/delete_requests_manager.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ func (d *DeleteRequestsManager) CanSkipSeries(userID []byte, lbls labels.Labels,
343343
}
344344

345345
// The delete request touches the series. Do not skip if the series is not processed yet.
346-
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; !ok {
346+
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, deleteRequest.StartTime, deleteRequest.EndTime, seriesID, tableName)]; !ok {
347347
return false
348348
}
349349
}
@@ -366,7 +366,7 @@ func (d *DeleteRequestsManager) Expired(userID []byte, chk retention.Chunk, lbls
366366
var filterFuncs []filter.Func
367367

368368
for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests {
369-
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; ok {
369+
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, deleteRequest.StartTime, deleteRequest.EndTime, seriesID, tableName)]; ok {
370370
continue
371371
}
372372
isDeleted, ff := deleteRequest.IsDeleted(userID, lbls, chk)
@@ -478,9 +478,13 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
478478
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
479479
}
480480

481-
d.processedSeries = map[string]struct{}{}
482-
if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) {
483-
level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err)
481+
// When we hit a timeout, MarkPhaseTimedOut is called to clear the list of delete requests to avoid marking delete requests as processed.
482+
// Since this method is still called when we hit a timeout, we do not want to drop the progress so that deletion skips the already processed streams.
483+
if len(d.deleteRequestsToProcess) > 0 {
484+
d.processedSeries = map[string]struct{}{}
485+
if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) {
486+
level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err)
487+
}
484488
}
485489
}
486490

@@ -513,18 +517,18 @@ func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, l
513517
if !labels.Selector(req.matchers).Matches(lbls) {
514518
continue
515519
}
516-
processedSeriesKey := buildProcessedSeriesKey(req.RequestID, seriesID, tableName)
520+
processedSeriesKey := buildProcessedSeriesKey(req.RequestID, req.StartTime, req.EndTime, seriesID, tableName)
517521
if _, ok := d.processedSeries[processedSeriesKey]; ok {
518-
return fmt.Errorf("series for [table: %s, series: %s, user: %s, req: %s]", tableName, seriesID, userID, req.RequestID)
522+
return fmt.Errorf("series already marked as processed: [table: %s, user: %s, req_id: %s, start: %d, end: %d, series: %s]", tableName, userID, req.RequestID, req.StartTime, req.EndTime, seriesID)
519523
}
520524
d.processedSeries[processedSeriesKey] = struct{}{}
521525
}
522526

523527
return nil
524528
}
525529

526-
func buildProcessedSeriesKey(requestID string, seriesID []byte, tableName string) string {
527-
return fmt.Sprintf("%s/%s/%s", requestID, tableName, seriesID)
530+
func buildProcessedSeriesKey(requestID string, startTime, endTime model.Time, seriesID []byte, tableName string) string {
531+
return fmt.Sprintf("%s/%d/%d/%s/%s", requestID, startTime, endTime, tableName, seriesID)
528532
}
529533

530534
func getMaxRetentionInterval(userID string, limits Limits) time.Duration {

pkg/compactor/deletion/delete_requests_manager_test.go

+40-4
Original file line numberDiff line numberDiff line change
@@ -1019,10 +1019,6 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
10191019
user2 := []byte("user2")
10201020
lblFooBar := mustParseLabel(`{foo="bar"}`)
10211021
lblFizzBuzz := mustParseLabel(`{fizz="buzz"}`)
1022-
deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{
1023-
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived},
1024-
{RequestID: "2", Query: lblFooBar.String(), UserID: string(user2), StartTime: 0, EndTime: 100, Status: StatusReceived},
1025-
}}
10261022
type markSeriesProcessed struct {
10271023
userID, seriesID []byte
10281024
lbls labels.Labels
@@ -1189,6 +1185,11 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
11891185
} {
11901186
t.Run(tc.name, func(t *testing.T) {
11911187
workingDir := t.TempDir()
1188+
deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{
1189+
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived},
1190+
{RequestID: "2", Query: lblFooBar.String(), UserID: string(user2), StartTime: 0, EndTime: 100, Status: StatusReceived},
1191+
}}
1192+
11921193
mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
11931194
require.NoError(t, err)
11941195
require.NoError(t, mgr.loadDeleteRequestsToProcess())
@@ -1207,6 +1208,7 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
12071208
mgr, err = NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
12081209
require.NoError(t, err)
12091210
require.Equal(t, storedSeriesProgress, mgr.processedSeries)
1211+
require.NoError(t, mgr.loadDeleteRequestsToProcess())
12101212

12111213
// when the mark phase ends, series progress should get cleared
12121214
mgr.MarkPhaseFinished()
@@ -1216,6 +1218,40 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
12161218
}
12171219
}
12181220

1221+
func TestDeleteRequestsManager_SeriesProgressWithTimeout(t *testing.T) {
1222+
workingDir := t.TempDir()
1223+
1224+
user1 := []byte("user1")
1225+
lblFooBar := mustParseLabel(`{foo="bar"}`)
1226+
deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{
1227+
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived},
1228+
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 100, EndTime: 200, Status: StatusReceived},
1229+
}}
1230+
1231+
mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
1232+
require.NoError(t, err)
1233+
require.NoError(t, mgr.loadDeleteRequestsToProcess())
1234+
1235+
require.NoError(t, mgr.MarkSeriesAsProcessed(user1, []byte(lblFooBar.String()), lblFooBar, "t1"))
1236+
1237+
// timeout the retention processing
1238+
mgr.MarkPhaseTimedOut()
1239+
1240+
// timeout should not clear the series progress
1241+
mgr.MarkPhaseFinished()
1242+
require.Len(t, mgr.processedSeries, 2)
1243+
require.NoError(t, mgr.storeSeriesProgress())
1244+
require.FileExists(t, filepath.Join(workingDir, seriesProgressFilename))
1245+
1246+
// load the requests again for processing
1247+
require.NoError(t, mgr.loadDeleteRequestsToProcess())
1248+
1249+
// not hitting the timeout should clear the series progress
1250+
mgr.MarkPhaseFinished()
1251+
require.Len(t, mgr.processedSeries, 0)
1252+
require.NoFileExists(t, filepath.Join(workingDir, seriesProgressFilename))
1253+
}
1254+
12191255
type storeAddReqDetails struct {
12201256
userID, query string
12211257
startTime, endTime model.Time

0 commit comments

Comments
 (0)