Skip to content

Commit b704d31

Browse files
committed
revert tryagain error
Signed-off-by: jyjiangkai <[email protected]>
1 parent 2278c46 commit b704d31

File tree

17 files changed

+201
-190
lines changed

17 files changed

+201
-190
lines changed

client/internal/vanus/store/block_store.go

Lines changed: 59 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -74,54 +74,52 @@ type BlockStore struct {
7474
readStream segpb.SegmentServer_ReadFromBlockStreamClient
7575
appendCallbacks sync.Map
7676
readCallbacks sync.Map
77-
appendMu sync.Mutex
78-
readMu sync.Mutex
77+
appendMu sync.RWMutex
78+
readMu sync.RWMutex
7979
}
8080

8181
type appendCallback func(*segpb.AppendToBlockStreamResponse)
8282
type readCallback func(*segpb.ReadFromBlockStreamResponse)
8383

8484
func (s *BlockStore) runAppendStreamRecv(ctx context.Context, stream segpb.SegmentServer_AppendToBlockStreamClient) {
85-
go func() {
86-
for {
87-
res, err := stream.Recv()
88-
if err != nil {
89-
log.Error(ctx, "append stream recv failed", map[string]interface{}{
90-
log.KeyError: err,
91-
})
92-
break
93-
}
94-
c, _ := s.appendCallbacks.LoadAndDelete(res.ResponseId)
95-
if c != nil {
96-
c.(appendCallback)(res)
97-
}
85+
for {
86+
res, err := stream.Recv()
87+
if err != nil {
88+
log.Error(ctx, "append stream recv failed", map[string]interface{}{
89+
log.KeyError: err,
90+
})
91+
break
92+
}
93+
c, _ := s.appendCallbacks.LoadAndDelete(res.Id)
94+
if c != nil {
95+
c.(appendCallback)(res)
9896
}
99-
}()
97+
}
10098
}
10199

102100
func (s *BlockStore) runReadStreamRecv(ctx context.Context, stream segpb.SegmentServer_ReadFromBlockStreamClient) {
103-
go func() {
104-
for {
105-
res, err := stream.Recv()
106-
if err != nil {
107-
log.Error(ctx, "read stream recv failed", map[string]interface{}{
108-
log.KeyError: err,
109-
})
110-
break
111-
}
112-
c, _ := s.readCallbacks.LoadAndDelete(res.ResponseId)
113-
if c != nil {
114-
c.(readCallback)(res)
115-
}
101+
for {
102+
res, err := stream.Recv()
103+
if err != nil {
104+
log.Error(ctx, "read stream recv failed", map[string]interface{}{
105+
log.KeyError: err,
106+
})
107+
break
116108
}
117-
}()
109+
c, _ := s.readCallbacks.LoadAndDelete(res.Id)
110+
if c != nil {
111+
c.(readCallback)(res)
112+
}
113+
}
118114
}
119115

120116
func (s *BlockStore) connectAppendStream(ctx context.Context) (segpb.SegmentServer_AppendToBlockStreamClient, error) {
117+
s.appendMu.RLock()
121118
if s.appendStream != nil {
119+
s.appendMu.RUnlock()
122120
return s.appendStream, nil
123121
}
124-
122+
s.appendMu.RUnlock()
125123
s.appendMu.Lock()
126124
defer s.appendMu.Unlock()
127125

@@ -142,15 +140,17 @@ func (s *BlockStore) connectAppendStream(ctx context.Context) (segpb.SegmentServ
142140
return nil, err
143141
}
144142

145-
s.runAppendStreamRecv(context.Background(), stream)
143+
go s.runAppendStreamRecv(context.Background(), stream)
146144
return stream, nil
147145
}
148146

149147
func (s *BlockStore) connectReadStream(ctx context.Context) (segpb.SegmentServer_ReadFromBlockStreamClient, error) {
148+
s.readMu.RLock()
150149
if s.readStream != nil {
150+
s.readMu.RUnlock()
151151
return s.readStream, nil
152152
}
153-
153+
s.readMu.RUnlock()
154154
s.readMu.Lock()
155155
defer s.readMu.Unlock()
156156

@@ -171,7 +171,7 @@ func (s *BlockStore) connectReadStream(ctx context.Context) (segpb.SegmentServer
171171
return nil, err
172172
}
173173

174-
s.runReadStreamRecv(ctx, stream)
174+
go s.runReadStreamRecv(ctx, stream)
175175
return stream, nil
176176
}
177177

@@ -180,6 +180,14 @@ func (s *BlockStore) Endpoint() string {
180180
}
181181

182182
func (s *BlockStore) Close() {
183+
if s.appendStream != nil {
184+
s.appendStream.CloseSend()
185+
s.appendStream = nil
186+
}
187+
if s.readStream != nil {
188+
s.readStream.CloseSend()
189+
s.readStream = nil
190+
}
183191
s.client.Close()
184192
}
185193

@@ -219,15 +227,13 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
219227
resp *segpb.AppendToBlockStreamResponse
220228
)
221229

222-
if s.appendStream == nil {
223-
s.appendStream, err = s.connectAppendStream(_ctx)
224-
if err != nil {
225-
return nil, err
226-
}
230+
s.appendStream, err = s.connectAppendStream(_ctx)
231+
if err != nil {
232+
return nil, err
227233
}
228234

229-
// generate unique RequestId
230-
requestID := rand.New(rand.NewSource(time.Now().UnixNano())).Uint64()
235+
// generate unique opaqueID
236+
opaqueID := rand.New(rand.NewSource(time.Now().UnixNano())).Uint64()
231237

232238
//TODO(jiangkai): delete the reference of CloudEvents/v2 in Vanus
233239
eventpbs := make([]*cepb.CloudEvent, len(events))
@@ -240,14 +246,14 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
240246
}
241247

242248
donec := make(chan struct{}, 1)
243-
s.appendCallbacks.Store(requestID, appendCallback(func(res *segpb.AppendToBlockStreamResponse) {
249+
s.appendCallbacks.Store(opaqueID, appendCallback(func(res *segpb.AppendToBlockStreamResponse) {
244250
resp = res
245251
donec <- struct{}{}
246252
}))
247253

248254
req := &segpb.AppendToBlockStreamRequest{
249-
RequestId: requestID,
250-
BlockId: block,
255+
Id: opaqueID,
256+
BlockId: block,
251257
Events: &cepb.CloudEventBatch{
252258
Events: eventpbs,
253259
},
@@ -260,10 +266,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
260266
if stderr.Is(err, io.EOF) {
261267
s.appendStream.CloseSend()
262268
s.appendStream = nil
263-
c, _ := s.appendCallbacks.LoadAndDelete(requestID)
269+
c, _ := s.appendCallbacks.LoadAndDelete(opaqueID)
264270
if c != nil {
265271
c.(appendCallback)(&segpb.AppendToBlockStreamResponse{
266-
ResponseId: requestID,
272+
Id: opaqueID,
267273
ResponseCode: errpb.ErrorCode_CLOSED,
268274
ResponseMsg: "append stream closed",
269275
Offsets: []int64{},
@@ -340,18 +346,16 @@ func (s *BlockStore) ReadStream(
340346
resp *segpb.ReadFromBlockStreamResponse
341347
)
342348

343-
if s.readStream == nil {
344-
s.readStream, err = s.connectReadStream(_ctx)
345-
if err != nil {
346-
return []*ce.Event{}, err
347-
}
349+
s.readStream, err = s.connectReadStream(_ctx)
350+
if err != nil {
351+
return []*ce.Event{}, err
348352
}
349353

350354
// generate unique RequestId
351-
requestID := rand.Uint64()
355+
opaqueID := rand.Uint64()
352356

353357
donec := make(chan struct{}, 1)
354-
s.readCallbacks.Store(requestID, readCallback(func(res *segpb.ReadFromBlockStreamResponse) {
358+
s.readCallbacks.Store(opaqueID, readCallback(func(res *segpb.ReadFromBlockStreamResponse) {
355359
resp = res
356360
donec <- struct{}{}
357361
}))
@@ -370,10 +374,10 @@ func (s *BlockStore) ReadStream(
370374
if stderr.Is(err, io.EOF) {
371375
s.readStream.CloseSend()
372376
s.readStream = nil
373-
c, _ := s.readCallbacks.LoadAndDelete(requestID)
377+
c, _ := s.readCallbacks.LoadAndDelete(opaqueID)
374378
if c != nil {
375379
c.(readCallback)(&segpb.ReadFromBlockStreamResponse{
376-
ResponseId: requestID,
380+
Id: opaqueID,
377381
ResponseCode: errpb.ErrorCode_CLOSED,
378382
ResponseMsg: "read stream closed",
379383
Events: &cepb.CloudEventBatch{

client/pkg/eventlog/eventlog_impl.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,9 @@ func (r *logReader) ReadStream(ctx context.Context, size int16) ([]*ce.Event, er
505505
if err != nil {
506506
if errors.Is(err, errors.ErrOffsetOverflow) {
507507
r.elog.refreshReadableSegments(ctx)
508-
r.switchSegment(ctx)
508+
if r.switchSegment(ctx) {
509+
return nil, errors.ErrTryAgain
510+
}
509511
}
510512
return nil, err
511513
}

internal/store/block/raft/appender.go

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,19 @@ type peer struct {
6767

6868
type LeaderChangedListener func(block, leader vanus.ID, term uint64)
6969

70-
type CommitWaiter struct {
70+
type CallBack struct {
7171
seqs []int64
72-
offset int64
7372
err error
7473
callback func([]int64, error)
7574
}
7675

77-
func (w CommitWaiter) Do() {
78-
w.callback(w.seqs, w.err)
76+
func (c CallBack) Do() {
77+
c.callback(c.seqs, c.err)
78+
}
79+
80+
type commitWaiter struct {
81+
offset int64
82+
cb CallBack
7983
}
8084

8185
type Appender interface {
@@ -92,8 +96,8 @@ type appender struct {
9296
actx block.AppendContext
9397
appendMu sync.RWMutex
9498

95-
waiters []CommitWaiter
96-
waiterC chan CommitWaiter
99+
waiters []commitWaiter
100+
callbackC chan CallBack
97101
commitIndex uint64
98102
commitOffset int64
99103
waitMu sync.Mutex
@@ -122,21 +126,21 @@ func NewAppender(
122126
raftLog *raftlog.Log,
123127
host transport.Host,
124128
listener LeaderChangedListener,
125-
waiterC chan CommitWaiter,
129+
callbackC chan CallBack,
126130
) Appender {
127131
ctx, cancel := context.WithCancel(ctx)
128132

129133
a := &appender{
130-
raw: raw,
131-
waiters: make([]CommitWaiter, 0),
132-
waiterC: waiterC,
133-
listener: listener,
134-
log: raftLog,
135-
host: host,
136-
hint: make([]peer, 0, defaultHintCapacity),
137-
cancel: cancel,
138-
doneC: make(chan struct{}),
139-
tracer: tracing.NewTracer("store.block.raft.appender", trace.SpanKindInternal),
134+
raw: raw,
135+
waiters: make([]commitWaiter, 0),
136+
callbackC: callbackC,
137+
listener: listener,
138+
log: raftLog,
139+
host: host,
140+
hint: make([]peer, 0, defaultHintCapacity),
141+
cancel: cancel,
142+
doneC: make(chan struct{}),
143+
tracer: tracing.NewTracer("store.block.raft.appender", trace.SpanKindInternal),
140144
}
141145
a.actx = a.raw.NewAppendContext(nil)
142146
a.commitOffset = a.actx.WriteOffset()
@@ -489,11 +493,13 @@ func (a *appender) Append(ctx context.Context, entries []block.Entry, cb func([]
489493
}
490494

491495
// register callback and wait until entries is committed.
492-
a.registerCommitWaiter(ctx, CommitWaiter{
493-
seqs: seqs,
494-
offset: offset,
495-
err: err,
496-
callback: cb,
496+
a.registerCommitWaiter(ctx, commitWaiter{
497+
offset: offset,
498+
cb: CallBack{
499+
seqs: seqs,
500+
err: err,
501+
callback: cb,
502+
},
497503
})
498504
}
499505

@@ -538,20 +544,21 @@ func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) {
538544
_, _ = a.raw.CommitAppend(ctx, frags...)
539545
}
540546

541-
func (a *appender) registerCommitWaiter(ctx context.Context, waiter CommitWaiter) {
547+
func (a *appender) registerCommitWaiter(ctx context.Context, waiter commitWaiter) {
542548
_, span := a.tracer.Start(ctx, "waitCommit")
543549
defer span.End()
544550

545551
span.AddEvent("Acquiring wait lock")
546552
a.waitMu.Lock()
547-
defer a.waitMu.Unlock()
548553
span.AddEvent("Got wait lock")
549554

550555
if waiter.offset <= a.commitOffset {
551-
waiter.callback(waiter.seqs, waiter.err)
556+
a.waitMu.Unlock()
557+
a.callbackC <- waiter.cb
552558
return
553559
}
554560
a.waiters = append(a.waiters, waiter)
561+
a.waitMu.Unlock()
555562
}
556563

557564
func (a *appender) doWakeup(ctx context.Context, commit int64) {
@@ -569,7 +576,7 @@ func (a *appender) doWakeup(ctx context.Context, commit int64) {
569576
if waiter.offset > commit {
570577
break
571578
}
572-
a.waiterC <- waiter
579+
a.callbackC <- waiter.cb
573580
a.waiters = a.waiters[1:]
574581
}
575582
a.commitOffset = commit

internal/store/segment/api.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (s *segmentServer) AppendToBlock(
124124
err error
125125
offsets []int64
126126
)
127-
donec := make(chan struct{})
127+
donec := make(chan struct{}, 1)
128128
blockID := vanus.NewIDFromUint64(req.BlockId)
129129
events := req.Events.GetEvents()
130130
s.srv.AppendToBlock(ctx, blockID, events, func(offs []int64, e error) {
@@ -163,10 +163,8 @@ func (s *segmentServer) AppendToBlockStream(stream segpb.SegmentServer_AppendToB
163163
})
164164
}
165165

166-
s.srv.NewMessageArrived(ctx, vanus.ID(request.BlockId))
167-
168166
err = stream.Send(&segpb.AppendToBlockStreamResponse{
169-
ResponseId: request.RequestId,
167+
Id: request.Id,
170168
ResponseCode: errCode,
171169
ResponseMsg: errMsg,
172170
Offsets: offsets,
@@ -221,7 +219,7 @@ func (s *segmentServer) ReadFromBlockStream(stream segpb.SegmentServer_ReadFromB
221219
}
222220

223221
err = stream.Send(&segpb.ReadFromBlockStreamResponse{
224-
ResponseId: request.RequestId,
222+
Id: request.Id,
225223
ResponseCode: errCode,
226224
ResponseMsg: errMsg,
227225
Events: &cepb.CloudEventBatch{Events: events},

internal/store/segment/recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (s *server) recoverBlocks(ctx context.Context, logs map[vanus.ID]*raftlog.L
7878
return err
7979
}
8080
}
81-
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.waiterC)
81+
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.callbackC)
8282
s.replicas.Store(id, &replica{
8383
id: id,
8484
idStr: id.String(),

internal/store/segment/replica.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (s *server) createBlock(ctx context.Context, id vanus.ID, size int64) (Repl
116116

117117
// Create replica.
118118
l := raftlog.NewLog(id, s.wal, s.metaStore, s.offsetStore, nil)
119-
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.waiterC)
119+
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.callbackC)
120120

121121
return &replica{
122122
id: id,

0 commit comments

Comments
 (0)