Skip to content

Commit 734120b

Browse files
committed
feat: fix review comments
Signed-off-by: jyjiangkai <[email protected]>
1 parent 394164f commit 734120b

File tree

10 files changed

+82
-77
lines changed

10 files changed

+82
-77
lines changed

client/internal/vanus/store/block_store.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
216216

217217
var (
218218
err error
219-
wg sync.WaitGroup
220219
resp *segpb.AppendToBlockStreamResponse
221220
)
222221

@@ -230,8 +229,6 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
230229
// generate unique RequestId
231230
requestID := rand.New(rand.NewSource(time.Now().UnixNano())).Uint64()
232231

233-
wg.Add(1)
234-
235232
//TODO(jiangkai): delete the reference of CloudEvents/v2 in Vanus
236233
eventpbs := make([]*cepb.CloudEvent, len(events))
237234
for idx := range events {
@@ -242,9 +239,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
242239
eventpbs[idx] = eventpb
243240
}
244241

242+
donec := make(chan struct{}, 1)
245243
s.appendCallbacks.Store(requestID, appendCallback(func(res *segpb.AppendToBlockStreamResponse) {
246244
resp = res
247-
wg.Done()
245+
donec <- struct{}{}
248246
}))
249247

250248
req := &segpb.AppendToBlockStreamRequest{
@@ -275,7 +273,7 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
275273
return nil, err
276274
}
277275

278-
wg.Wait()
276+
<-donec
279277

280278
if resp.ResponseCode == errpb.ErrorCode_FULL {
281279
log.Warning(ctx, "block append failed cause the segment is full", nil)
@@ -339,7 +337,6 @@ func (s *BlockStore) ReadStream(
339337

340338
var (
341339
err error
342-
wg sync.WaitGroup
343340
resp *segpb.ReadFromBlockStreamResponse
344341
)
345342

@@ -353,11 +350,10 @@ func (s *BlockStore) ReadStream(
353350
// generate unique RequestId
354351
requestID := rand.Uint64()
355352

356-
wg.Add(1)
357-
358-
s.appendCallbacks.Store(requestID, readCallback(func(res *segpb.ReadFromBlockStreamResponse) {
353+
donec := make(chan struct{}, 1)
354+
s.readCallbacks.Store(requestID, readCallback(func(res *segpb.ReadFromBlockStreamResponse) {
359355
resp = res
360-
wg.Done()
356+
donec <- struct{}{}
361357
}))
362358

363359
req := &segpb.ReadFromBlockStreamRequest{
@@ -389,7 +385,7 @@ func (s *BlockStore) ReadStream(
389385
return []*ce.Event{}, err
390386
}
391387

392-
wg.Wait()
388+
<-donec
393389

394390
if resp.ResponseCode != errpb.ErrorCode_SUCCESS {
395391
log.Warning(ctx, "block read failed cause unknown error", nil)

client/pkg/eventbus/eventbus.go

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -469,13 +469,7 @@ func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.
469469
return "", err
470470
}
471471

472-
// 3. generate event ID
473-
var buf [16]byte
474-
binary.BigEndian.PutUint64(buf[0:8], lw.Log().ID())
475-
binary.BigEndian.PutUint64(buf[8:16], uint64(off))
476-
encoded := base64.StdEncoding.EncodeToString(buf[:])
477-
478-
return encoded, nil
472+
return genEventID(lw.Log().ID(), off), nil
479473
}
480474

481475
func (w *busWriter) AppendMany(ctx context.Context, events []*ce.Event, opts ...api.WriteOption) (eid []string, err error) {
@@ -504,22 +498,22 @@ func (w *busWriter) AppendMany(ctx context.Context, events []*ce.Event, opts ...
504498
}
505499

506500
// 3. generate event ID
507-
genFunc := func(off int64) string {
508-
var buf [16]byte
509-
binary.BigEndian.PutUint64(buf[0:8], lw.Log().ID())
510-
binary.BigEndian.PutUint64(buf[8:16], uint64(off))
511-
encoded := base64.StdEncoding.EncodeToString(buf[:])
512-
return encoded
513-
}
514-
515501
eventIDs := make([]string, len(offsets))
516502
for idx := range offsets {
517-
eventIDs[idx] = genFunc(offsets[idx])
503+
eventIDs[idx] = genEventID(lw.Log().ID(), offsets[idx])
518504
}
519505

520506
return eventIDs, nil
521507
}
522508

509+
func genEventID(logID uint64, off int64) string {
510+
var buf [16]byte
511+
binary.BigEndian.PutUint64(buf[0:8], logID)
512+
binary.BigEndian.PutUint64(buf[8:16], uint64(off))
513+
encoded := base64.StdEncoding.EncodeToString(buf[:])
514+
return encoded
515+
}
516+
523517
func (w *busWriter) Bus() api.Eventbus {
524518
return w.ebus
525519
}

client/pkg/eventlog/eventlog_impl.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -354,15 +354,14 @@ func (w *logWriter) Append(ctx context.Context, event *ce.Event) (int64, error)
354354
if err == nil {
355355
return offset, nil
356356
}
357-
vlog.Warning(ctx, "failed to Append", map[string]interface{}{
357+
vlog.Warning(ctx, "append failed", map[string]interface{}{
358358
vlog.KeyError: err,
359359
"offset": offset,
360360
})
361361
if errors.Is(err, errors.ErrFull) {
362362
vlog.Warning(ctx, "segment is full, retry", map[string]interface{}{
363363
vlog.KeyError: err,
364-
"offset": offset,
365-
"retry_num": i,
364+
"retry_times": i,
366365
})
367366
if i < retryTimes {
368367
continue
@@ -396,11 +395,15 @@ func (w *logWriter) AppendManyStream(ctx context.Context, events []*ce.Event) ([
396395
if err == nil {
397396
return offsets, nil
398397
}
399-
vlog.Warning(ctx, "failed to Append", map[string]interface{}{
398+
vlog.Warning(ctx, "append failed", map[string]interface{}{
400399
vlog.KeyError: err,
401400
"offsets": offsets,
402401
})
403402
if errors.Is(err, errors.ErrFull) {
403+
vlog.Warning(ctx, "segment is full, retry", map[string]interface{}{
404+
vlog.KeyError: err,
405+
"retry_times": i,
406+
})
404407
if i < retryTimes {
405408
continue
406409
}

internal/store/block/block.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type Reader interface {
4545
}
4646

4747
type Appender interface {
48-
Append(ctx context.Context, cb func([]int64, error), entries ...Entry)
48+
Append(ctx context.Context, entries []Entry, cb func([]int64, error))
4949
}
5050

5151
type Block interface {

internal/store/block/raft/appender.go

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ const (
4848
defaultHeartbeatTick = 3
4949
defaultMaxSizePerMsg = 4096
5050
defaultMaxInflightMsgs = 256
51-
defaultWaiterWorker = 8
5251
)
5352

5453
type Peer struct {
@@ -68,13 +67,17 @@ type peer struct {
6867

6968
type LeaderChangedListener func(block, leader vanus.ID, term uint64)
7069

71-
type commitWaiter struct {
70+
type CommitWaiter struct {
7271
seqs []int64
7372
offset int64
7473
err error
7574
callback func([]int64, error)
7675
}
7776

77+
func (w CommitWaiter) Do() {
78+
w.callback(w.seqs, w.err)
79+
}
80+
7881
type Appender interface {
7982
block.Appender
8083

@@ -89,8 +92,8 @@ type appender struct {
8992
actx block.AppendContext
9093
appendMu sync.RWMutex
9194

92-
waiters []commitWaiter
93-
waiterC chan commitWaiter
95+
waiters []CommitWaiter
96+
waiterC chan CommitWaiter
9497
commitIndex uint64
9598
commitOffset int64
9699
waitMu sync.Mutex
@@ -114,14 +117,14 @@ type appender struct {
114117
var _ Appender = (*appender)(nil)
115118

116119
func NewAppender(
117-
ctx context.Context, raw block.Raw, raftLog *raftlog.Log, host transport.Host, listener LeaderChangedListener,
120+
ctx context.Context, raw block.Raw, raftLog *raftlog.Log, host transport.Host, listener LeaderChangedListener, waiterC chan CommitWaiter,
118121
) Appender {
119122
ctx, cancel := context.WithCancel(ctx)
120123

121124
a := &appender{
122125
raw: raw,
123-
waiters: make([]commitWaiter, 0),
124-
waiterC: make(chan commitWaiter),
126+
waiters: make([]CommitWaiter, 0),
127+
waiterC: waiterC,
125128
listener: listener,
126129
log: raftLog,
127130
host: host,
@@ -200,29 +203,11 @@ func (a *appender) Delete(ctx context.Context) {
200203
a.log.Delete(ctx)
201204
}
202205

203-
func (a *appender) runWaiterWorker(ctx context.Context) {
204-
for i := 0; i < defaultWaiterWorker; i++ {
205-
go func() {
206-
for {
207-
select {
208-
case waiter := <-a.waiterC:
209-
waiter.callback(waiter.seqs, waiter.err)
210-
case <-ctx.Done():
211-
close(a.waiterC)
212-
return
213-
}
214-
}
215-
}()
216-
}
217-
}
218-
219206
func (a *appender) run(ctx context.Context) {
220207
// TODO(james.yin): reduce Ticker
221208
t := time.NewTicker(defaultTickInterval)
222209
defer t.Stop()
223210

224-
a.runWaiterWorker(ctx)
225-
226211
for {
227212
select {
228213
case <-t.C:
@@ -482,18 +467,24 @@ func (a *appender) reset(ctx context.Context) {
482467
}
483468

484469
// Append implements async block.raw.
485-
func (a *appender) Append(ctx context.Context, cb func([]int64, error), entries ...block.Entry) {
470+
func (a *appender) Append(ctx context.Context, entries []block.Entry, cb func([]int64, error)) {
486471
ctx, span := a.tracer.Start(ctx, "Append")
487472
defer span.End()
488473

474+
span.AddEvent("Acquiring append lock")
475+
a.appendMu.Lock()
476+
span.AddEvent("Got append lock")
477+
478+
defer a.appendMu.Unlock()
479+
489480
seqs, offset, err := a.append(ctx, entries)
490481
if err != nil && !errors.Is(err, errors.ErrFull) {
491482
cb(nil, err)
492483
return
493484
}
494485

495486
// register callback and wait until entries is committed.
496-
a.registerCommitWaiter(ctx, commitWaiter{
487+
a.registerCommitWaiter(ctx, CommitWaiter{
497488
seqs: seqs,
498489
offset: offset,
499490
err: err,
@@ -505,12 +496,6 @@ func (a *appender) append(ctx context.Context, entries []block.Entry) ([]int64,
505496
ctx, span := a.tracer.Start(ctx, "append")
506497
defer span.End()
507498

508-
span.AddEvent("Acquiring append lock")
509-
a.appendMu.Lock()
510-
span.AddEvent("Got append lock")
511-
512-
defer a.appendMu.Unlock()
513-
514499
if !a.isLeader() {
515500
return nil, 0, errors.ErrNotLeader.WithMessage("the appender is not leader")
516501
}
@@ -548,7 +533,7 @@ func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) {
548533
_, _ = a.raw.CommitAppend(ctx, frags...)
549534
}
550535

551-
func (a *appender) registerCommitWaiter(ctx context.Context, waiter commitWaiter) {
536+
func (a *appender) registerCommitWaiter(ctx context.Context, waiter CommitWaiter) {
552537
_, span := a.tracer.Start(ctx, "waitCommit")
553538
defer span.End()
554539

internal/store/segment/api.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package segment
1717
import (
1818
// standard libraries.
1919
"context"
20-
"sync"
2120

2221
// third-party libraries.
2322
cepb "cloudevents.io/genproto/v1"
@@ -124,17 +123,16 @@ func (s *segmentServer) AppendToBlock(
124123
var (
125124
err error
126125
offsets []int64
127-
wg sync.WaitGroup
128126
)
129-
wg.Add(1)
127+
donec := make(chan struct{})
130128
blockID := vanus.NewIDFromUint64(req.BlockId)
131129
events := req.Events.GetEvents()
132130
s.srv.AppendToBlock(ctx, blockID, events, func(offs []int64, e error) {
133131
offsets = offs
134132
err = e
135-
wg.Done()
133+
donec <- struct{}{}
136134
})
137-
wg.Wait()
135+
<-donec
138136
return &segpb.AppendToBlockResponse{Offsets: offsets}, err
139137
}
140138

@@ -165,6 +163,8 @@ func (s *segmentServer) AppendToBlockStream(stream segpb.SegmentServer_AppendToB
165163
})
166164
}
167165

166+
s.srv.NewMessageArrived(ctx, vanus.ID(request.BlockId))
167+
168168
err = stream.Send(&segpb.AppendToBlockStreamResponse{
169169
ResponseId: request.RequestId,
170170
ResponseCode: errCode,

internal/store/segment/recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (s *server) recoverBlocks(ctx context.Context, logs map[vanus.ID]*raftlog.L
7878
return err
7979
}
8080
}
81-
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged)
81+
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.waiterC)
8282
s.replicas.Store(id, &replica{
8383
id: id,
8484
idStr: id.String(),

internal/store/segment/replica.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ func (r *replica) Read(ctx context.Context, seq int64, num int) ([]block.Entry,
8080
return r.raw.Read(ctx, seq, num)
8181
}
8282

83-
func (r *replica) Append(ctx context.Context, cb func([]int64, error), entries ...block.Entry) {
84-
r.appender.Append(ctx, cb, entries...)
83+
func (r *replica) Append(ctx context.Context, entries []block.Entry, cb func([]int64, error)) {
84+
r.appender.Append(ctx, entries, cb)
8585
}
8686

8787
func (r *replica) Status() *metapb.SegmentHealthInfo {
@@ -116,7 +116,7 @@ func (s *server) createBlock(ctx context.Context, id vanus.ID, size int64) (Repl
116116

117117
// Create replica.
118118
l := raftlog.NewLog(id, s.wal, s.metaStore, s.offsetStore, nil)
119-
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged)
119+
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.waiterC)
120120

121121
return &replica{
122122
id: id,

0 commit comments

Comments
 (0)