Skip to content

Commit 1985156

Browse files
(2.12) Default AsyncFlush for replicated streams
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 833cc2a commit 1985156

File tree

6 files changed

+44
-62
lines changed

6 files changed

+44
-62
lines changed

server/filestore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
679679

680680
if lmb := fs.lmb; lmb != nil {
681681
// Enable/disable async flush depending on if it's supported and already initialized.
682-
supportsAsyncFlush := cfg.AllowAsyncFlush && cfg.Replicas > 1
682+
supportsAsyncFlush := !fs.fcfg.SyncAlways && cfg.Replicas > 1
683683
if supportsAsyncFlush && !fs.fcfg.AsyncFlush {
684684
fs.fcfg.AsyncFlush = true
685685
lmb.spinUpFlushLoop()

server/jetstream_benchmark_test.go

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -858,14 +858,13 @@ func BenchmarkJetStreamPublish(b *testing.B) {
858858
replicas int
859859
messageSize int
860860
numSubjects int
861-
asyncFlush bool
862861
}{
863-
{1, 1, 10, 1, false}, // Single node, 10B messages
864-
{1, 1, 1024, 1, false}, // Single node, 1KB messages
865-
{3, 3, 10, 1, false}, // 3-nodes cluster, R=3, 10B messages
866-
{3, 3, 1024, 1, false}, // 3-nodes cluster, R=3, 1KB messages
867-
{3, 3, 10, 1, true}, // 3-nodes cluster, R=3, 10B messages (async flush)
868-
{3, 3, 1024, 1, true}, // 3-nodes cluster, R=3, 1KB messages (async flush)
862+
{1, 1, 10, 1}, // Single node, 10B messages
863+
{1, 1, 1024, 1}, // Single node, 1KB messages
864+
{3, 3, 10, 1}, // 3-nodes cluster, R=3, 10B messages
865+
{3, 3, 1024, 1}, // 3-nodes cluster, R=3, 1KB messages
866+
{3, 3, 10, 1}, // 3-nodes cluster, R=3, 10B messages (async flush)
867+
{3, 3, 1024, 1}, // 3-nodes cluster, R=3, 1KB messages (async flush)
869868
}
870869

871870
// All the cases above are run with each of the publisher cases below
@@ -887,10 +886,6 @@ func BenchmarkJetStreamPublish(b *testing.B) {
887886
bc.messageSize,
888887
bc.numSubjects,
889888
)
890-
if bc.asyncFlush {
891-
name += ",AsyncFlush"
892-
}
893-
894889
b.Run(
895890
name,
896891
func(b *testing.B) {
@@ -939,11 +934,10 @@ func BenchmarkJetStreamPublish(b *testing.B) {
939934
b.Logf("Creating stream with R=%d and %d input subjects", bc.replicas, bc.numSubjects)
940935
}
941936
_, err = jsStreamCreate(b, nc, &StreamConfig{
942-
Name: streamName,
943-
Subjects: subjects,
944-
Replicas: bc.replicas,
945-
Storage: FileStorage,
946-
AllowAsyncFlush: bc.asyncFlush,
937+
Name: streamName,
938+
Subjects: subjects,
939+
Replicas: bc.replicas,
940+
Storage: FileStorage,
947941
})
948942
if err != nil {
949943
b.Fatalf("Error creating stream: %v", err)
@@ -1942,11 +1936,10 @@ func BenchmarkJetStreamPublishConcurrent(b *testing.B) {
19421936
replicasCases := []struct {
19431937
clusterSize int
19441938
replicas int
1945-
asyncFlush bool
19461939
}{
1947-
{1, 1, false},
1948-
{3, 3, false},
1949-
{3, 3, true},
1940+
{1, 1},
1941+
{3, 3},
1942+
{3, 3},
19501943
}
19511944

19521945
workload := func(b *testing.B, numPubs int, messageSize int64, clientUrl string) {
@@ -2056,9 +2049,6 @@ func BenchmarkJetStreamPublishConcurrent(b *testing.B) {
20562049
// benchmark case matrix
20572050
for _, replicasCase := range replicasCases {
20582051
title := fmt.Sprintf("N=%d,R=%d", replicasCase.clusterSize, replicasCase.replicas)
2059-
if replicasCase.asyncFlush {
2060-
title += ",AsyncFlush"
2061-
}
20622052
b.Run(
20632053
title,
20642054
func(b *testing.B) {
@@ -2079,11 +2069,10 @@ func BenchmarkJetStreamPublishConcurrent(b *testing.B) {
20792069

20802070
// create stream
20812071
_, err := jsStreamCreate(b, nc, &StreamConfig{
2082-
Name: streamName,
2083-
Subjects: []string{subject},
2084-
Replicas: replicasCase.replicas,
2085-
Storage: FileStorage,
2086-
AllowAsyncFlush: replicasCase.asyncFlush,
2072+
Name: streamName,
2073+
Subjects: []string{subject},
2074+
Replicas: replicasCase.replicas,
2075+
Storage: FileStorage,
20872076
})
20882077
if err != nil {
20892078
b.Fatal(err)

server/jetstream_cluster_1_test.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9059,12 +9059,18 @@ func TestJetStreamClusterSetPreferredToOnlineNode(t *testing.T) {
90599059
}
90609060

90619061
func TestJetStreamClusterAsyncFlushBasics(t *testing.T) {
9062-
test := func(t *testing.T, replicas int) {
9063-
supportsAsync := replicas > 1
9062+
test := func(t *testing.T, syncAlways bool) {
9063+
supportsAsync := !syncAlways
90649064

90659065
c := createJetStreamClusterExplicit(t, "R3S", 3)
90669066
defer c.shutdown()
90679067

9068+
for _, s := range c.servers {
9069+
s.optsMu.Lock()
9070+
s.opts.SyncAlways = syncAlways
9071+
s.optsMu.Unlock()
9072+
}
9073+
90689074
nc, js := jsClientConnect(t, c.randomServer())
90699075
defer nc.Close()
90709076

@@ -9101,11 +9107,10 @@ func TestJetStreamClusterAsyncFlushBasics(t *testing.T) {
91019107
}
91029108

91039109
cfg := &StreamConfig{
9104-
Name: "TEST",
9105-
Subjects: []string{"foo"},
9106-
Storage: FileStorage,
9107-
Replicas: replicas,
9108-
AllowAsyncFlush: false,
9110+
Name: "TEST",
9111+
Subjects: []string{"foo"},
9112+
Storage: FileStorage,
9113+
Replicas: 1,
91099114
}
91109115
// Test disabled async flush on create.
91119116
_, err := jsStreamCreate(t, nc, cfg)
@@ -9116,15 +9121,18 @@ func TestJetStreamClusterAsyncFlushBasics(t *testing.T) {
91169121
checkStoreIsAsync(false)
91179122

91189123
// Enabling async flush.
9119-
cfg.AllowAsyncFlush = true
9124+
cfg.Replicas = 3
91209125
_, err = jsStreamUpdate(t, nc, cfg)
91219126
require_NoError(t, err)
9127+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
9128+
return checkState(t, c, globalAccountName, "TEST")
9129+
})
91229130
_, err = js.Publish("foo", nil)
91239131
require_NoError(t, err)
91249132
checkStoreIsAsync(supportsAsync)
91259133

91269134
// Disabling async flush.
9127-
cfg.AllowAsyncFlush = false
9135+
cfg.Replicas = 1
91289136
_, err = jsStreamUpdate(t, nc, cfg)
91299137
require_NoError(t, err)
91309138
_, err = js.Publish("foo", nil)
@@ -9133,7 +9141,7 @@ func TestJetStreamClusterAsyncFlushBasics(t *testing.T) {
91339141

91349142
// Test async flush on create.
91359143
require_NoError(t, js.DeleteStream("TEST"))
9136-
cfg.AllowAsyncFlush = true
9144+
cfg.Replicas = 3
91379145
_, err = jsStreamCreate(t, nc, cfg)
91389146
require_NoError(t, err)
91399147
s = c.streamLeader(globalAccountName, "TEST")
@@ -9142,8 +9150,8 @@ func TestJetStreamClusterAsyncFlushBasics(t *testing.T) {
91429150
checkStoreIsAsync(supportsAsync)
91439151
}
91449152

9145-
t.Run("R1", func(t *testing.T) { test(t, 1) })
9146-
t.Run("R3", func(t *testing.T) { test(t, 3) })
9153+
t.Run("Default", func(t *testing.T) { test(t, false) })
9154+
t.Run("SyncAlways", func(t *testing.T) { test(t, true) })
91479155
}
91489156

91499157
func TestJetStreamClusterAsyncFlushFileStoreFlushOnSnapshot(t *testing.T) {
@@ -9154,11 +9162,10 @@ func TestJetStreamClusterAsyncFlushFileStoreFlushOnSnapshot(t *testing.T) {
91549162
defer nc.Close()
91559163

91569164
_, err := jsStreamCreate(t, nc, &StreamConfig{
9157-
Name: "TEST",
9158-
Subjects: []string{"foo"},
9159-
Storage: FileStorage,
9160-
Replicas: 3,
9161-
AllowAsyncFlush: true,
9165+
Name: "TEST",
9166+
Subjects: []string{"foo"},
9167+
Storage: FileStorage,
9168+
Replicas: 3,
91629169
})
91639170
require_NoError(t, err)
91649171

server/jetstream_versioning.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,6 @@ func setStaticStreamMetadata(cfg *StreamConfig) {
5555
requires(2)
5656
}
5757

58-
// Async flush was added in v2.12 and require API level 2.
59-
if cfg.AllowAsyncFlush {
60-
requires(2)
61-
}
62-
6358
cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
6459
}
6560

server/jetstream_versioning_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,6 @@ func TestJetStreamSetStaticStreamMetadata(t *testing.T) {
8282
cfg: &StreamConfig{AllowAtomicPublish: true},
8383
expectedMetadata: metadataAtLevel("2"),
8484
},
85-
{
86-
desc: "AllowAsyncFlush",
87-
cfg: &StreamConfig{AllowAsyncFlush: true},
88-
expectedMetadata: metadataAtLevel("2"),
89-
},
9085
} {
9186
t.Run(test.desc, func(t *testing.T) {
9287
setStaticStreamMetadata(test.cfg)

server/stream.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,6 @@ type StreamConfig struct {
116116
// AllowAtomicPublish allows atomic batch publishing into the stream.
117117
AllowAtomicPublish bool `json:"allow_atomic"`
118118

119-
// AllowAsyncFlush allows replicated streams to asynchronously flush
120-
// to the stream, improving throughput.
121-
AllowAsyncFlush bool `json:"allow_async_flush"`
122-
123119
// Metadata is additional metadata for the Stream.
124120
Metadata map[string]string `json:"metadata,omitempty"`
125121
}
@@ -798,12 +794,12 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
798794
}
799795
}
800796
fsCfg.StoreDir = storeDir
801-
// Async flushing is only allowed if the stream has a sync log backing it.
802-
fsCfg.AsyncFlush = config.AllowAsyncFlush && config.Replicas > 1
803797
// Grab configured sync interval.
804798
fsCfg.SyncInterval = s.getOpts().SyncInterval
805799
fsCfg.SyncAlways = s.getOpts().SyncAlways
806800
fsCfg.Compression = config.Compression
801+
// Async flushing is only allowed if the stream has a sync log backing it.
802+
fsCfg.AsyncFlush = !fsCfg.SyncAlways && config.Replicas > 1
807803

808804
if err := mset.setupStore(fsCfg); err != nil {
809805
mset.stop(true, false)

0 commit comments

Comments
 (0)