Skip to content

Commit e24d783

Browse files
(2.12) Default AsyncFlush for replicated streams (#7163)
`AllowAsyncFlush` used to be an opt-in stream-level setting, which allows a replicated stream to asynchronously flush writes to the underlying JetStream stream. This is a safe operation as it's backed by a replicated log that _is_ flushed, and we make sure to flush any remaining async writes before snapshotting and compacting the log. After upgrading to 2.12, you'll see improved performance by default, with no action required by users. This setting is automatically enabled for streams with `Replicas > 1` and with the `SyncAlways` server setting disabled (default). Signed-off-by: Maurice van Veen <[email protected]>
2 parents 9406307 + 1985156 commit e24d783

File tree

6 files changed

+44
-62
lines changed

6 files changed

+44
-62
lines changed

server/filestore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
679679

680680
if lmb := fs.lmb; lmb != nil {
681681
// Enable/disable async flush depending on if it's supported and already initialized.
682-
supportsAsyncFlush := cfg.AllowAsyncFlush && cfg.Replicas > 1
682+
supportsAsyncFlush := !fs.fcfg.SyncAlways && cfg.Replicas > 1
683683
if supportsAsyncFlush && !fs.fcfg.AsyncFlush {
684684
fs.fcfg.AsyncFlush = true
685685
lmb.spinUpFlushLoop()

server/jetstream_benchmark_test.go

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -858,14 +858,13 @@ func BenchmarkJetStreamPublish(b *testing.B) {
858858
replicas int
859859
messageSize int
860860
numSubjects int
861-
asyncFlush bool
862861
}{
863-
{1, 1, 10, 1, false}, // Single node, 10B messages
864-
{1, 1, 1024, 1, false}, // Single node, 1KB messages
865-
{3, 3, 10, 1, false}, // 3-nodes cluster, R=3, 10B messages
866-
{3, 3, 1024, 1, false}, // 3-nodes cluster, R=3, 1KB messages
867-
{3, 3, 10, 1, true}, // 3-nodes cluster, R=3, 10B messages (async flush)
868-
{3, 3, 1024, 1, true}, // 3-nodes cluster, R=3, 1KB messages (async flush)
862+
{1, 1, 10, 1}, // Single node, 10B messages
863+
{1, 1, 1024, 1}, // Single node, 1KB messages
864+
{3, 3, 10, 1}, // 3-nodes cluster, R=3, 10B messages
865+
{3, 3, 1024, 1}, // 3-nodes cluster, R=3, 1KB messages
866+
{3, 3, 10, 1}, // 3-nodes cluster, R=3, 10B messages (async flush)
867+
{3, 3, 1024, 1}, // 3-nodes cluster, R=3, 1KB messages (async flush)
869868
}
870869

871870
// All the cases above are run with each of the publisher cases below
@@ -887,10 +886,6 @@ func BenchmarkJetStreamPublish(b *testing.B) {
887886
bc.messageSize,
888887
bc.numSubjects,
889888
)
890-
if bc.asyncFlush {
891-
name += ",AsyncFlush"
892-
}
893-
894889
b.Run(
895890
name,
896891
func(b *testing.B) {
@@ -939,11 +934,10 @@ func BenchmarkJetStreamPublish(b *testing.B) {
939934
b.Logf("Creating stream with R=%d and %d input subjects", bc.replicas, bc.numSubjects)
940935
}
941936
_, err = jsStreamCreate(b, nc, &StreamConfig{
942-
Name: streamName,
943-
Subjects: subjects,
944-
Replicas: bc.replicas,
945-
Storage: FileStorage,
946-
AllowAsyncFlush: bc.asyncFlush,
937+
Name: streamName,
938+
Subjects: subjects,
939+
Replicas: bc.replicas,
940+
Storage: FileStorage,
947941
})
948942
if err != nil {
949943
b.Fatalf("Error creating stream: %v", err)
@@ -1942,11 +1936,10 @@ func BenchmarkJetStreamPublishConcurrent(b *testing.B) {
19421936
replicasCases := []struct {
19431937
clusterSize int
19441938
replicas int
1945-
asyncFlush bool
19461939
}{
1947-
{1, 1, false},
1948-
{3, 3, false},
1949-
{3, 3, true},
1940+
{1, 1},
1941+
{3, 3},
1942+
{3, 3},
19501943
}
19511944

19521945
workload := func(b *testing.B, numPubs int, messageSize int64, clientUrl string) {
@@ -2056,9 +2049,6 @@ func BenchmarkJetStreamPublishConcurrent(b *testing.B) {
20562049
// benchmark case matrix
20572050
for _, replicasCase := range replicasCases {
20582051
title := fmt.Sprintf("N=%d,R=%d", replicasCase.clusterSize, replicasCase.replicas)
2059-
if replicasCase.asyncFlush {
2060-
title += ",AsyncFlush"
2061-
}
20622052
b.Run(
20632053
title,
20642054
func(b *testing.B) {
@@ -2079,11 +2069,10 @@ func BenchmarkJetStreamPublishConcurrent(b *testing.B) {
20792069

20802070
// create stream
20812071
_, err := jsStreamCreate(b, nc, &StreamConfig{
2082-
Name: streamName,
2083-
Subjects: []string{subject},
2084-
Replicas: replicasCase.replicas,
2085-
Storage: FileStorage,
2086-
AllowAsyncFlush: replicasCase.asyncFlush,
2072+
Name: streamName,
2073+
Subjects: []string{subject},
2074+
Replicas: replicasCase.replicas,
2075+
Storage: FileStorage,
20872076
})
20882077
if err != nil {
20892078
b.Fatal(err)

server/jetstream_cluster_1_test.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9078,12 +9078,18 @@ func TestJetStreamClusterSetPreferredToOnlineNode(t *testing.T) {
90789078
}
90799079

90809080
func TestJetStreamClusterAsyncFlushBasics(t *testing.T) {
9081-
test := func(t *testing.T, replicas int) {
9082-
supportsAsync := replicas > 1
9081+
test := func(t *testing.T, syncAlways bool) {
9082+
supportsAsync := !syncAlways
90839083

90849084
c := createJetStreamClusterExplicit(t, "R3S", 3)
90859085
defer c.shutdown()
90869086

9087+
for _, s := range c.servers {
9088+
s.optsMu.Lock()
9089+
s.opts.SyncAlways = syncAlways
9090+
s.optsMu.Unlock()
9091+
}
9092+
90879093
nc, js := jsClientConnect(t, c.randomServer())
90889094
defer nc.Close()
90899095

@@ -9120,11 +9126,10 @@ func TestJetStreamClusterAsyncFlushBasics(t *testing.T) {
91209126
}
91219127

91229128
cfg := &StreamConfig{
9123-
Name: "TEST",
9124-
Subjects: []string{"foo"},
9125-
Storage: FileStorage,
9126-
Replicas: replicas,
9127-
AllowAsyncFlush: false,
9129+
Name: "TEST",
9130+
Subjects: []string{"foo"},
9131+
Storage: FileStorage,
9132+
Replicas: 1,
91289133
}
91299134
// Test disabled async flush on create.
91309135
_, err := jsStreamCreate(t, nc, cfg)
@@ -9135,15 +9140,18 @@ func TestJetStreamClusterAsyncFlushBasics(t *testing.T) {
91359140
checkStoreIsAsync(false)
91369141

91379142
// Enabling async flush.
9138-
cfg.AllowAsyncFlush = true
9143+
cfg.Replicas = 3
91399144
_, err = jsStreamUpdate(t, nc, cfg)
91409145
require_NoError(t, err)
9146+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
9147+
return checkState(t, c, globalAccountName, "TEST")
9148+
})
91419149
_, err = js.Publish("foo", nil)
91429150
require_NoError(t, err)
91439151
checkStoreIsAsync(supportsAsync)
91449152

91459153
// Disabling async flush.
9146-
cfg.AllowAsyncFlush = false
9154+
cfg.Replicas = 1
91479155
_, err = jsStreamUpdate(t, nc, cfg)
91489156
require_NoError(t, err)
91499157
_, err = js.Publish("foo", nil)
@@ -9152,7 +9160,7 @@ func TestJetStreamClusterAsyncFlushBasics(t *testing.T) {
91529160

91539161
// Test async flush on create.
91549162
require_NoError(t, js.DeleteStream("TEST"))
9155-
cfg.AllowAsyncFlush = true
9163+
cfg.Replicas = 3
91569164
_, err = jsStreamCreate(t, nc, cfg)
91579165
require_NoError(t, err)
91589166
s = c.streamLeader(globalAccountName, "TEST")
@@ -9161,8 +9169,8 @@ func TestJetStreamClusterAsyncFlushBasics(t *testing.T) {
91619169
checkStoreIsAsync(supportsAsync)
91629170
}
91639171

9164-
t.Run("R1", func(t *testing.T) { test(t, 1) })
9165-
t.Run("R3", func(t *testing.T) { test(t, 3) })
9172+
t.Run("Default", func(t *testing.T) { test(t, false) })
9173+
t.Run("SyncAlways", func(t *testing.T) { test(t, true) })
91669174
}
91679175

91689176
func TestJetStreamClusterAsyncFlushFileStoreFlushOnSnapshot(t *testing.T) {
@@ -9173,11 +9181,10 @@ func TestJetStreamClusterAsyncFlushFileStoreFlushOnSnapshot(t *testing.T) {
91739181
defer nc.Close()
91749182

91759183
_, err := jsStreamCreate(t, nc, &StreamConfig{
9176-
Name: "TEST",
9177-
Subjects: []string{"foo"},
9178-
Storage: FileStorage,
9179-
Replicas: 3,
9180-
AllowAsyncFlush: true,
9184+
Name: "TEST",
9185+
Subjects: []string{"foo"},
9186+
Storage: FileStorage,
9187+
Replicas: 3,
91819188
})
91829189
require_NoError(t, err)
91839190

server/jetstream_versioning.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,6 @@ func setStaticStreamMetadata(cfg *StreamConfig) {
5555
requires(2)
5656
}
5757

58-
// Async flush was added in v2.12 and require API level 2.
59-
if cfg.AllowAsyncFlush {
60-
requires(2)
61-
}
62-
6358
cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
6459
}
6560

server/jetstream_versioning_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,6 @@ func TestJetStreamSetStaticStreamMetadata(t *testing.T) {
8282
cfg: &StreamConfig{AllowAtomicPublish: true},
8383
expectedMetadata: metadataAtLevel("2"),
8484
},
85-
{
86-
desc: "AllowAsyncFlush",
87-
cfg: &StreamConfig{AllowAsyncFlush: true},
88-
expectedMetadata: metadataAtLevel("2"),
89-
},
9085
} {
9186
t.Run(test.desc, func(t *testing.T) {
9287
setStaticStreamMetadata(test.cfg)

server/stream.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,6 @@ type StreamConfig struct {
116116
// AllowAtomicPublish allows atomic batch publishing into the stream.
117117
AllowAtomicPublish bool `json:"allow_atomic"`
118118

119-
// AllowAsyncFlush allows replicated streams to asynchronously flush
120-
// to the stream, improving throughput.
121-
AllowAsyncFlush bool `json:"allow_async_flush"`
122-
123119
// Metadata is additional metadata for the Stream.
124120
Metadata map[string]string `json:"metadata,omitempty"`
125121
}
@@ -798,12 +794,12 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
798794
}
799795
}
800796
fsCfg.StoreDir = storeDir
801-
// Async flushing is only allowed if the stream has a sync log backing it.
802-
fsCfg.AsyncFlush = config.AllowAsyncFlush && config.Replicas > 1
803797
// Grab configured sync interval.
804798
fsCfg.SyncInterval = s.getOpts().SyncInterval
805799
fsCfg.SyncAlways = s.getOpts().SyncAlways
806800
fsCfg.Compression = config.Compression
801+
// Async flushing is only allowed if the stream has a sync log backing it.
802+
fsCfg.AsyncFlush = !fsCfg.SyncAlways && config.Replicas > 1
807803

808804
if err := mset.setupStore(fsCfg); err != nil {
809805
mset.stop(true, false)

0 commit comments

Comments
 (0)