Skip to content

Commit 5513df2

Browse files
authored
Merge pull request #252 from ydb-platform/err-cluster-closed
* Fixed bug with unexpected failing of call `Invoke` and `NewStream` …
2 parents 38e8369 + e56c274 commit 5513df2

File tree

11 files changed

+310
-205
lines changed

11 files changed

+310
-205
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
* Fixed bug with unexpected failing of call `Invoke` and `NewStream` on closed cluster
2+
* Fixed bug with releasing `internal/conn/conn.Pool` in cluster
3+
* Replaced interface `internal/conn/conn.Pool` to struct `internal/conn/conn.Pool`
4+
15
## v3.25.0
26
* Added `ydb.GRPCConn(ydb.Connection)` helper for connect to driver-unsupported YDB services
37
* Marked as deprecated `session.Prepare` callback

connection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ type connection struct {
102102
ratelimiter *internalRatelimiter.Client
103103
ratelimiterOptions []ratelimiterConfig.Option
104104

105-
pool conn.Pool
105+
pool *conn.Pool
106106

107107
mtx sync.Mutex
108108
router router.Connection

internal/cluster/cluster.go

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,26 @@ const (
1919
MaxGetConnTimeout = 10 * time.Second
2020
)
2121

22+
// nolint: gofumpt
23+
// nolint: nolintlint
2224
var (
23-
// ErrClusterClosed returned when requested on a closed cluster.
24-
ErrClusterClosed = xerrors.Wrap(fmt.Errorf("cluster closed"))
25-
2625
// ErrClusterEmpty returned when no connections left in cluster.
2726
ErrClusterEmpty = xerrors.Wrap(fmt.Errorf("cluster empty"))
2827
)
2928

3029
type Cluster struct {
3130
config config.Config
32-
pool conn.Pool
31+
pool *conn.Pool
3332
conns []conn.Conn
3433

3534
balancer balancer.Balancer
3635

37-
done chan struct{}
38-
3936
onBadStateCallback balancer.OnBadStateCallback
4037
}
4138

42-
func (c *Cluster) isClosed() bool {
43-
select {
44-
case <-c.done:
45-
return true
46-
default:
47-
return false
48-
}
49-
}
50-
5139
// Ban connection in underling pool
5240
func (c *Cluster) Ban(ctx context.Context, cc conn.Conn, cause error) {
53-
c.pool.Pessimize(ctx, cc, cause)
41+
c.pool.Ban(ctx, cc, cause)
5442

5543
online := 0
5644
for _, cc := range c.conns {
@@ -64,15 +52,15 @@ func (c *Cluster) Ban(ctx context.Context, cc conn.Conn, cause error) {
6452
}
6553
}
6654

67-
// Unban connection in underling pool
68-
func (c *Cluster) Unban(ctx context.Context, cc conn.Conn) {
69-
c.pool.Unpessimize(ctx, cc)
55+
// Allow connection in underling pool
56+
func (c *Cluster) Allow(ctx context.Context, cc conn.Conn) {
57+
c.pool.Allow(ctx, cc)
7058
}
7159

7260
func New(
7361
ctx context.Context,
7462
config config.Config,
75-
pool conn.Pool,
63+
pool *conn.Pool,
7664
endpoints []endpoint.Endpoint,
7765
onBadStateCallback balancer.OnBadStateCallback,
7866
) *Cluster {
@@ -99,7 +87,6 @@ func New(
9987
)
10088

10189
return &Cluster{
102-
done: make(chan struct{}),
10390
config: config,
10491
pool: pool,
10592
conns: conns,
@@ -118,17 +105,12 @@ func (c *Cluster) Close(ctx context.Context) (err error) {
118105
onDone(err)
119106
}()
120107

121-
close(c.done)
122-
123-
return nil
108+
return c.pool.Release(ctx)
124109
}
125110

126111
func (c *Cluster) get(ctx context.Context) (cc conn.Conn, _ error) {
127112
for {
128113
select {
129-
case <-c.done:
130-
return nil, xerrors.WithStackTrace(ErrClusterClosed)
131-
132114
case <-ctx.Done():
133115
return nil, xerrors.WithStackTrace(ctx.Err())
134116

@@ -156,10 +138,6 @@ func (c *Cluster) get(ctx context.Context) (cc conn.Conn, _ error) {
156138
// Get returns next available connection.
157139
// It returns error on given deadline cancellation or when cluster become closed.
158140
func (c *Cluster) Get(ctx context.Context) (cc conn.Conn, err error) {
159-
if c.isClosed() {
160-
return nil, xerrors.WithStackTrace(ErrClusterClosed)
161-
}
162-
163141
var cancel context.CancelFunc
164142
// without client context deadline lock limited on MaxGetConnTimeout
165143
// cluster endpoints cannot be updated at this time

internal/conn/pool.go

Lines changed: 13 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -14,36 +14,7 @@ import (
1414
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1515
)
1616

17-
type Pool interface {
18-
Getter
19-
Taker
20-
Releaser
21-
Pessimizer
22-
}
23-
24-
type Getter interface {
25-
Get(endpoint endpoint.Endpoint) Conn
26-
}
27-
28-
type Taker interface {
29-
Take(ctx context.Context) error
30-
}
31-
32-
type Releaser interface {
33-
Release(ctx context.Context) error
34-
}
35-
36-
type Pessimizer interface {
37-
Pessimize(ctx context.Context, cc Conn, cause error)
38-
Unpessimize(ctx context.Context, cc Conn)
39-
}
40-
41-
type PoolConfig interface {
42-
ConnectionTTL() time.Duration
43-
GrpcDialOptions() []grpc.DialOption
44-
}
45-
46-
type pool struct {
17+
type Pool struct {
4718
usages int64
4819
config Config
4920
mtx sync.RWMutex
@@ -52,7 +23,7 @@ type pool struct {
5223
done chan struct{}
5324
}
5425

55-
func (p *pool) Get(endpoint endpoint.Endpoint) Conn {
26+
func (p *Pool) Get(endpoint endpoint.Endpoint) Conn {
5627
p.mtx.Lock()
5728
defer p.mtx.Unlock()
5829

@@ -73,13 +44,13 @@ func (p *pool) Get(endpoint endpoint.Endpoint) Conn {
7344
return cc
7445
}
7546

76-
func (p *pool) remove(c *conn) {
47+
func (p *Pool) remove(c *conn) {
7748
p.mtx.Lock()
7849
defer p.mtx.Unlock()
7950
delete(p.conns, c.Endpoint().Address())
8051
}
8152

82-
func (p *pool) Pessimize(ctx context.Context, cc Conn, cause error) {
53+
func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) {
8354
e := cc.Endpoint().Copy()
8455

8556
p.mtx.RLock()
@@ -90,7 +61,7 @@ func (p *pool) Pessimize(ctx context.Context, cc Conn, cause error) {
9061
return
9162
}
9263

93-
trace.DriverOnPessimizeNode(
64+
trace.DriverOnConnBan(
9465
p.config.Trace(),
9566
&ctx,
9667
e,
@@ -99,7 +70,7 @@ func (p *pool) Pessimize(ctx context.Context, cc Conn, cause error) {
9970
)(cc.SetState(Banned))
10071
}
10172

102-
func (p *pool) Unpessimize(ctx context.Context, cc Conn) {
73+
func (p *Pool) Allow(ctx context.Context, cc Conn) {
10374
e := cc.Endpoint().Copy()
10475

10576
p.mtx.RLock()
@@ -110,19 +81,19 @@ func (p *pool) Unpessimize(ctx context.Context, cc Conn) {
11081
return
11182
}
11283

113-
trace.DriverOnUnpessimizeNode(p.config.Trace(),
84+
trace.DriverOnConnAllow(p.config.Trace(),
11485
&ctx,
11586
e,
11687
cc.GetState(),
11788
)(cc.SetState(Online))
11889
}
11990

120-
func (p *pool) Take(context.Context) error {
91+
func (p *Pool) Take(context.Context) error {
12192
atomic.AddInt64(&p.usages, 1)
12293
return nil
12394
}
12495

125-
func (p *pool) Release(ctx context.Context) error {
96+
func (p *Pool) Release(ctx context.Context) error {
12697
if atomic.AddInt64(&p.usages, -1) > 0 {
12798
return nil
12899
}
@@ -150,7 +121,7 @@ func (p *pool) Release(ctx context.Context) error {
150121
return nil
151122
}
152123

153-
func (p *pool) connParker(ctx context.Context, ttl, interval time.Duration) {
124+
func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) {
154125
ticker := time.NewTicker(interval)
155126
defer ticker.Stop()
156127
for {
@@ -172,7 +143,7 @@ func (p *pool) connParker(ctx context.Context, ttl, interval time.Duration) {
172143
}
173144
}
174145

175-
func (p *pool) collectConns() []*conn {
146+
func (p *Pool) collectConns() []*conn {
176147
p.mtx.RLock()
177148
defer p.mtx.RUnlock()
178149
conns := make([]*conn, 0, len(p.conns))
@@ -185,8 +156,8 @@ func (p *pool) collectConns() []*conn {
185156
func NewPool(
186157
ctx context.Context,
187158
config Config,
188-
) Pool {
189-
p := &pool{
159+
) *Pool {
160+
p := &Pool{
190161
usages: 1,
191162
config: config,
192163
opts: config.GrpcDialOptions(),

internal/router/router.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222

2323
type router struct {
2424
config config.Config
25-
pool conn.Pool
25+
pool *conn.Pool
2626

2727
clusterMtx sync.RWMutex
2828
clusterPtr *cluster.Cluster
@@ -99,7 +99,7 @@ func (r *router) Close(ctx context.Context) (err error) {
9999
func New(
100100
ctx context.Context,
101101
c config.Config,
102-
pool conn.Pool,
102+
pool *conn.Pool,
103103
opts ...discoveryConfig.Option,
104104
) (_ Connection, err error) {
105105
onDone := trace.DriverOnInit(
@@ -182,7 +182,17 @@ func (r *router) Invoke(
182182
return xerrors.WithStackTrace(err)
183183
}
184184

185-
defer r.handleConnRequestError(ctx, &err, cc)
185+
defer func() {
186+
if err == nil {
187+
if cc.GetState() == conn.Banned {
188+
r.cluster().Allow(ctx, cc)
189+
}
190+
} else {
191+
if xerrors.MustPessimizeEndpoint(err, r.config.ExcludeGRPCCodesForPessimization()...) {
192+
r.cluster().Ban(ctx, cc, err)
193+
}
194+
}
195+
}()
186196

187197
ctx, err = r.config.Meta().Meta(ctx)
188198
if err != nil {
@@ -202,13 +212,24 @@ func (r *router) NewStream(
202212
desc *grpc.StreamDesc,
203213
method string,
204214
opts ...grpc.CallOption,
205-
) (grpc.ClientStream, error) {
206-
cc, err := r.cluster().Get(ctx)
215+
) (_ grpc.ClientStream, err error) {
216+
var cc conn.Conn
217+
cc, err = r.cluster().Get(ctx)
207218
if err != nil {
208219
return nil, xerrors.WithStackTrace(err)
209220
}
210221

211-
defer r.handleConnRequestError(ctx, &err, cc)
222+
defer func() {
223+
if err == nil {
224+
if cc.GetState() == conn.Banned {
225+
r.cluster().Allow(ctx, cc)
226+
}
227+
} else {
228+
if xerrors.MustPessimizeEndpoint(err, r.config.ExcludeGRPCCodesForPessimization()...) {
229+
r.cluster().Ban(ctx, cc, err)
230+
}
231+
}
232+
}()
212233

213234
ctx, err = r.config.Meta().Meta(ctx)
214235
if err != nil {
@@ -223,13 +244,3 @@ func (r *router) NewStream(
223244

224245
return client, nil
225246
}
226-
227-
func (r *router) handleConnRequestError(ctx context.Context, perr *error, cc conn.Conn) {
228-
err := *perr
229-
if err == nil && cc.GetState() == conn.Banned {
230-
r.cluster().Unban(ctx, cc)
231-
}
232-
if err != nil && xerrors.MustPessimizeEndpoint(err, r.config.ExcludeGRPCCodesForPessimization()...) {
233-
r.cluster().Ban(ctx, cc, err)
234-
}
235-
}

internal/table/client.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,6 @@ func (c *Client) keeper(ctx context.Context) {
645645
case
646646
xerrors.Is(
647647
err,
648-
cluster.ErrClusterClosed,
649648
cluster.ErrClusterEmpty,
650649
),
651650
xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION),

0 commit comments

Comments
 (0)