@@ -18,8 +18,8 @@ import (
18
18
// standard libraries
19
19
"context"
20
20
"io"
21
- "math/rand"
22
21
"sync"
22
+ "sync/atomic"
23
23
"time"
24
24
25
25
"github.com/linkall-labs/vanus/observability/tracing"
@@ -66,17 +66,18 @@ func newBlockStore(endpoint string) (*BlockStore, error) {
66
66
return s , nil
67
67
}
68
68
69
- type streamstate string
69
+ type streamState string
70
70
71
71
var (
72
- stateRunning streamstate = "running"
73
- stateCLosed streamstate = "closed"
72
+ stateRunning streamState = "running"
73
+ stateClosed streamState = "closed"
74
74
)
75
75
76
76
type appendStreamCache struct {
77
+ opaqueID uint64
77
78
stream segpb.SegmentServer_AppendToBlockStreamClient
78
79
callbacks sync.Map
79
- state streamstate
80
+ state streamState
80
81
once sync.Once
81
82
}
82
83
@@ -85,7 +86,7 @@ func (a *appendStreamCache) isRunning() bool {
85
86
}
86
87
87
88
func (a * appendStreamCache ) isClosed () bool {
88
- return a .state == stateCLosed
89
+ return a .state == stateClosed
89
90
}
90
91
91
92
func (a * appendStreamCache ) release () {
@@ -96,27 +97,26 @@ func (a *appendStreamCache) release() {
96
97
func (a * appendStreamCache ) releaseStream () {
97
98
a .once .Do (func () {
98
99
a .stream .CloseSend ()
99
- a .state = stateCLosed
100
+ a .state = stateClosed
100
101
})
101
102
}
102
103
103
104
func (a * appendStreamCache ) releaseCallbacks () {
104
105
a .callbacks .Range (func (key , value interface {}) bool {
105
- if value != nil {
106
- value .(appendCallback )(& segpb.AppendToBlockStreamResponse {
107
- ResponseCode : errpb .ErrorCode_CLOSED ,
108
- ResponseMsg : "append stream closed" ,
109
- Offsets : []int64 {},
110
- })
111
- }
106
+ value .(appendCallback )(& segpb.AppendToBlockStreamResponse {
107
+ ResponseCode : errpb .ErrorCode_CLOSED ,
108
+ ResponseMsg : "append stream closed" ,
109
+ Offsets : []int64 {},
110
+ })
112
111
return true
113
112
})
114
113
}
115
114
116
115
type readStreamCache struct {
116
+ opaqueID uint64
117
117
stream segpb.SegmentServer_ReadFromBlockStreamClient
118
118
callbacks sync.Map
119
- state streamstate
119
+ state streamState
120
120
once sync.Once
121
121
}
122
122
@@ -125,7 +125,7 @@ func (r *readStreamCache) isRunning() bool {
125
125
}
126
126
127
127
func (r * readStreamCache ) isClosed () bool {
128
- return r .state == stateCLosed
128
+ return r .state == stateClosed
129
129
}
130
130
131
131
func (r * readStreamCache ) release () {
@@ -136,21 +136,19 @@ func (r *readStreamCache) release() {
136
136
func (r * readStreamCache ) releaseStream () {
137
137
r .once .Do (func () {
138
138
r .stream .CloseSend ()
139
- r .state = stateCLosed
139
+ r .state = stateClosed
140
140
})
141
141
}
142
142
143
143
func (r * readStreamCache ) releaseCallbacks () {
144
144
r .callbacks .Range (func (key , value interface {}) bool {
145
- if value != nil {
146
- value .(readCallback )(& segpb.ReadFromBlockStreamResponse {
147
- ResponseCode : errpb .ErrorCode_CLOSED ,
148
- ResponseMsg : "read stream closed" ,
149
- Events : & cepb.CloudEventBatch {
150
- Events : []* cepb.CloudEvent {},
151
- },
152
- })
153
- }
145
+ value .(readCallback )(& segpb.ReadFromBlockStreamResponse {
146
+ ResponseCode : errpb .ErrorCode_CLOSED ,
147
+ ResponseMsg : "read stream closed" ,
148
+ Events : & cepb.CloudEventBatch {
149
+ Events : []* cepb.CloudEvent {},
150
+ },
151
+ })
154
152
return true
155
153
})
156
154
}
@@ -335,7 +333,7 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
335
333
}
336
334
337
335
// generate unique opaqueID
338
- opaqueID := rand . New ( rand . NewSource ( time . Now (). UnixNano ())). Uint64 ( )
336
+ atomic . AddUint64 ( & append . opaqueID , 1 )
339
337
340
338
//TODO(jiangkai): delete the reference of CloudEvents/v2 in Vanus
341
339
eventpbs := make ([]* cepb.CloudEvent , len (events ))
@@ -348,13 +346,13 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
348
346
}
349
347
350
348
donec := make (chan struct {})
351
- append .callbacks .Store (opaqueID , appendCallback (func (res * segpb.AppendToBlockStreamResponse ) {
349
+ append .callbacks .Store (append . opaqueID , appendCallback (func (res * segpb.AppendToBlockStreamResponse ) {
352
350
resp = res
353
351
close (donec )
354
352
}))
355
353
356
354
req := & segpb.AppendToBlockStreamRequest {
357
- Id : opaqueID ,
355
+ Id : append . opaqueID ,
358
356
BlockId : block ,
359
357
Events : & cepb.CloudEventBatch {
360
358
Events : eventpbs ,
@@ -369,10 +367,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
369
367
append .releaseStream ()
370
368
// reset new stream connections
371
369
s .connectAppendStream (ctx )
372
- c , _ := append .callbacks .LoadAndDelete (opaqueID )
370
+ c , _ := append .callbacks .LoadAndDelete (append . opaqueID )
373
371
if c != nil {
374
372
c .(appendCallback )(& segpb.AppendToBlockStreamResponse {
375
- Id : opaqueID ,
373
+ Id : append . opaqueID ,
376
374
ResponseCode : errpb .ErrorCode_CLOSED ,
377
375
ResponseMsg : "append stream closed" ,
378
376
Offsets : []int64 {},
@@ -384,10 +382,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
384
382
select {
385
383
case <- donec :
386
384
case <- _ctx .Done ():
387
- c , _ := append .callbacks .LoadAndDelete (opaqueID )
385
+ c , _ := append .callbacks .LoadAndDelete (append . opaqueID )
388
386
if c != nil {
389
387
c .(appendCallback )(& segpb.AppendToBlockStreamResponse {
390
- Id : opaqueID ,
388
+ Id : append . opaqueID ,
391
389
ResponseCode : errpb .ErrorCode_CONTEXT_CANCELED ,
392
390
ResponseMsg : "append stream context canceled" ,
393
391
Offsets : []int64 {},
@@ -465,10 +463,10 @@ func (s *BlockStore) ReadStream(
465
463
}
466
464
467
465
// generate unique RequestId
468
- opaqueID := rand . New ( rand . NewSource ( time . Now (). UnixNano ())). Uint64 ( )
466
+ atomic . AddUint64 ( & read . opaqueID , 1 )
469
467
470
468
donec := make (chan struct {})
471
- read .callbacks .Store (opaqueID , readCallback (func (res * segpb.ReadFromBlockStreamResponse ) {
469
+ read .callbacks .Store (read . opaqueID , readCallback (func (res * segpb.ReadFromBlockStreamResponse ) {
472
470
resp = res
473
471
close (donec )
474
472
}))
@@ -486,10 +484,10 @@ func (s *BlockStore) ReadStream(
486
484
})
487
485
read .releaseStream ()
488
486
s .connectReadStream (ctx )
489
- c , _ := read .callbacks .LoadAndDelete (opaqueID )
487
+ c , _ := read .callbacks .LoadAndDelete (read . opaqueID )
490
488
if c != nil {
491
489
c .(readCallback )(& segpb.ReadFromBlockStreamResponse {
492
- Id : opaqueID ,
490
+ Id : read . opaqueID ,
493
491
ResponseCode : errpb .ErrorCode_CLOSED ,
494
492
ResponseMsg : "read stream closed" ,
495
493
Events : & cepb.CloudEventBatch {
@@ -503,10 +501,10 @@ func (s *BlockStore) ReadStream(
503
501
select {
504
502
case <- donec :
505
503
case <- _ctx .Done ():
506
- c , _ := read .callbacks .LoadAndDelete (opaqueID )
504
+ c , _ := read .callbacks .LoadAndDelete (read . opaqueID )
507
505
if c != nil {
508
506
c .(readCallback )(& segpb.ReadFromBlockStreamResponse {
509
- Id : opaqueID ,
507
+ Id : read . opaqueID ,
510
508
ResponseCode : errpb .ErrorCode_CONTEXT_CANCELED ,
511
509
ResponseMsg : "read stream context canceled" ,
512
510
Events : & cepb.CloudEventBatch {
0 commit comments