Skip to content

Commit 1abe765

Browse files
authored
Merge pull request #1663 from ydb-platform/extract-from-retry-must-delete-sessions
Extracted logic of checking error for delete session from retry.Retry
2 parents ca77afc + 10a0df2 commit 1abe765

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1056
-306
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Refactored behaviour on `retry.Retryable` error for retry object (such as session, connection or transaction)
2+
13
## v3.100.0
24
* Added `table.DescribeTable.StoreType` to table description result from `table.Session.DescribeTable` request
35

internal/coordination/client_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestCreateNode(t *testing.T) {
4747
)
4848
err := createNode(ctx, client, &Ydb_Coordination.CreateNodeRequest{})
4949
require.True(t, xerrors.IsTransportError(err, grpcCodes.ResourceExhausted))
50-
require.True(t, xerrors.IsRetryObjectValid(err))
50+
require.False(t, mustDeleteSession(err))
5151
})
5252
t.Run("OperationError", func(t *testing.T) {
5353
ctx := xtest.Context(t)
@@ -58,7 +58,7 @@ func TestCreateNode(t *testing.T) {
5858
)
5959
err := createNode(ctx, client, &Ydb_Coordination.CreateNodeRequest{})
6060
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
61-
require.True(t, xerrors.IsRetryObjectValid(err))
61+
require.False(t, mustDeleteSession(err))
6262
})
6363
}
6464

@@ -306,7 +306,7 @@ func TestAlterNode(t *testing.T) {
306306
)
307307
err := alterNode(ctx, client, &Ydb_Coordination.AlterNodeRequest{})
308308
require.True(t, xerrors.IsTransportError(err, grpcCodes.ResourceExhausted))
309-
require.True(t, xerrors.IsRetryObjectValid(err))
309+
require.False(t, mustDeleteSession(err))
310310
})
311311
t.Run("OperationError", func(t *testing.T) {
312312
ctx := xtest.Context(t)
@@ -317,7 +317,7 @@ func TestAlterNode(t *testing.T) {
317317
)
318318
err := alterNode(ctx, client, &Ydb_Coordination.AlterNodeRequest{})
319319
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
320-
require.True(t, xerrors.IsRetryObjectValid(err))
320+
require.False(t, mustDeleteSession(err))
321321
})
322322
}
323323

@@ -372,7 +372,7 @@ func TestDropNode(t *testing.T) {
372372
)
373373
err := dropNode(ctx, client, &Ydb_Coordination.DropNodeRequest{})
374374
require.True(t, xerrors.IsTransportError(err, grpcCodes.ResourceExhausted))
375-
require.True(t, xerrors.IsRetryObjectValid(err))
375+
require.False(t, mustDeleteSession(err))
376376
})
377377
t.Run("OperationError", func(t *testing.T) {
378378
ctx := xtest.Context(t)
@@ -383,6 +383,6 @@ func TestDropNode(t *testing.T) {
383383
)
384384
err := dropNode(ctx, client, &Ydb_Coordination.DropNodeRequest{})
385385
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
386-
require.True(t, xerrors.IsRetryObjectValid(err))
386+
require.False(t, mustDeleteSession(err))
387387
})
388388
}

internal/coordination/errors.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package coordination
2+
3+
import (
4+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
5+
grpcCodes "google.golang.org/grpc/codes"
6+
7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
8+
)
9+
10+
func mustDeleteSession(err error) bool {
11+
if xerrors.IsOperationError(err,
12+
Ydb.StatusIds_BAD_SESSION,
13+
Ydb.StatusIds_SESSION_BUSY,
14+
Ydb.StatusIds_SESSION_EXPIRED,
15+
) {
16+
return true
17+
}
18+
19+
if xerrors.IsTransportError(err,
20+
grpcCodes.Canceled,
21+
grpcCodes.Unknown,
22+
grpcCodes.InvalidArgument,
23+
grpcCodes.DeadlineExceeded,
24+
grpcCodes.NotFound,
25+
grpcCodes.AlreadyExists,
26+
grpcCodes.PermissionDenied,
27+
grpcCodes.FailedPrecondition,
28+
grpcCodes.Aborted,
29+
grpcCodes.Unimplemented,
30+
grpcCodes.Internal,
31+
grpcCodes.Unavailable,
32+
grpcCodes.DataLoss,
33+
grpcCodes.Unauthenticated,
34+
) {
35+
return true
36+
}
37+
38+
return false
39+
}

internal/pool/pool.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func New[PT ItemConstraint[T], T any](
151151
createTimeout: defaultCreateTimeout,
152152
closeTimeout: defaultCloseTimeout,
153153
mustDeleteItemFunc: func(item PT, err error) bool {
154-
return !xerrors.IsRetryObjectValid(err)
154+
return !item.IsAlive()
155155
},
156156
},
157157
index: make(map[PT]itemInfo[PT, T]),
@@ -380,6 +380,26 @@ func (p *Pool[PT, T]) changeState(changeState func() Stats) {
380380
}
381381
}
382382

383+
func (p *Pool[PT, T]) checkItemAndError(item PT, err error) error {
384+
if !item.IsAlive() {
385+
return errItemIsNotAlive
386+
}
387+
388+
if err == nil {
389+
return nil
390+
}
391+
392+
if p.config.mustDeleteItemFunc(item, err) {
393+
return err
394+
}
395+
396+
if !xerrors.IsValid(err, item) {
397+
return err
398+
}
399+
400+
return nil
401+
}
402+
383403
func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) {
384404
if onTry := p.config.trace.OnTry; onTry != nil {
385405
onDone := onTry(&ctx,
@@ -417,14 +437,14 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item
417437
}
418438

419439
defer func() {
420-
if finalErr == nil || !p.config.mustDeleteItemFunc(item, finalErr) {
421-
_ = p.putItem(ctx, item)
422-
} else {
440+
if err := p.checkItemAndError(item, finalErr); err != nil {
423441
p.closeItem(ctx, item,
424442
closeItemWithLock(),
425443
closeItemNotifyStats(),
426444
closeItemWithDeleteFromPool(),
427445
)
446+
} else {
447+
_ = p.putItem(ctx, item)
428448
}
429449
}()
430450

internal/pool/pool_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
849849
var (
850850
newItems atomic.Int64
851851
deleteItems atomic.Int64
852-
expErr = xerrors.Retryable(errors.New("expected error"), xerrors.InvalidObject())
852+
expErr = errors.New("expected error")
853853
)
854854
p := New(rootCtx,
855855
WithLimit[*testItem, testItem](1),
@@ -874,7 +874,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
874874
)
875875
err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error {
876876
if newItems.Load() < 10 {
877-
return expErr
877+
return xerrors.Retryable(expErr, xerrors.Invalid(testItem))
878878
}
879879

880880
return nil

internal/query/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,13 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *
561561
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
562562
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
563563
pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()),
564+
pool.WithMustDeleteItemFunc[*Session, Session](func(s *Session, err error) bool {
565+
if !s.IsAlive() {
566+
return true
567+
}
568+
569+
return err != nil && xerrors.MustDeleteTableOrQuerySession(err)
570+
}),
564571
pool.WithIdleTimeToLive[*Session, Session](cfg.SessionIdleTimeToLive()),
565572
pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) {
566573
var (

internal/query/transaction.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -322,10 +322,13 @@ func commitTx(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi
322322
}
323323

324324
func (tx *Transaction) CommitTx(ctx context.Context) (finalErr error) {
325+
onDone := trace.QueryOnTxCommit(tx.s.trace, &ctx,
326+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Transaction).CommitTx"), tx.s, tx)
325327
defer func() {
326328
if finalErr != nil {
327329
applyStatusByError(tx.s, finalErr)
328330
}
331+
onDone(finalErr)
329332
}()
330333

331334
if tx.ID() == baseTx.LazyTxID {
@@ -367,12 +370,6 @@ func rollback(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi
367370
}
368371

369372
func (tx *Transaction) Rollback(ctx context.Context) (finalErr error) {
370-
defer func() {
371-
if finalErr != nil {
372-
applyStatusByError(tx.s, finalErr)
373-
}
374-
}()
375-
376373
if tx.ID() == baseTx.LazyTxID {
377374
// https://github.com/ydb-platform/ydb-go-sdk/issues/1456
378375
return tx.s.Close(ctx)
@@ -382,6 +379,15 @@ func (tx *Transaction) Rollback(ctx context.Context) (finalErr error) {
382379
return nil
383380
}
384381

382+
onDone := trace.QueryOnTxRollback(tx.s.trace, &ctx,
383+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Transaction).Rollback"), tx.s, tx)
384+
defer func() {
385+
if finalErr != nil {
386+
applyStatusByError(tx.s, finalErr)
387+
}
388+
onDone(finalErr)
389+
}()
390+
385391
tx.completed = true
386392

387393
tx.notifyOnCompleted(ErrTransactionRollingBack)

internal/table/client.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
// sessionBuilder is the interface that holds logic of creating sessions.
2222
type sessionBuilder func(ctx context.Context) (*Session, error)
2323

24-
func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client {
24+
func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client { //nolint:funlen
2525
onDone := trace.TableOnInit(config.Trace(), &ctx,
2626
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.New"),
2727
)
@@ -39,6 +39,13 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
3939
pool.WithIdleTimeToLive[*Session, Session](config.IdleThreshold()),
4040
pool.WithCreateItemTimeout[*Session, Session](config.CreateSessionTimeout()),
4141
pool.WithCloseItemTimeout[*Session, Session](config.DeleteTimeout()),
42+
pool.WithMustDeleteItemFunc[*Session, Session](func(s *Session, err error) bool {
43+
if !s.IsAlive() {
44+
return true
45+
}
46+
47+
return err != nil && xerrors.MustDeleteTableOrQuerySession(err)
48+
}),
4249
pool.WithClock[*Session, Session](config.Clock()),
4350
pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) {
4451
return newSession(ctx, cc, config)
@@ -210,7 +217,9 @@ func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Optio
210217
onDone(attempts, finalErr)
211218
}()
212219

213-
err := do(ctx, c.pool, c.config, op, func(err error) {
220+
err := do(ctx, c.pool, c.config, func(ctx context.Context, s *Session) error {
221+
return op(ctx, s)
222+
}, func(err error) {
214223
attempts++
215224
}, config.RetryOptions...)
216225
if err != nil {
@@ -239,7 +248,7 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
239248
onDone(attempts, finalErr)
240249
}()
241250

242-
return retryBackoff(ctx, c.pool, func(ctx context.Context, s table.Session) (err error) {
251+
return retryBackoff(ctx, c.pool, func(ctx context.Context, s *Session) (err error) {
243252
attempts++
244253

245254
tx, err := s.BeginTransaction(ctx, config.TxSettings)
@@ -248,9 +257,7 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
248257
}
249258

250259
defer func() {
251-
if err != nil && !xerrors.IsOperationError(err) {
252-
_ = tx.Rollback(ctx)
253-
}
260+
_ = tx.Rollback(ctx)
254261
}()
255262

256263
if err = executeTxOperation(ctx, c, op, tx); err != nil {

internal/table/retry.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ func do(
2525
ctx context.Context,
2626
pool sessionPool,
2727
config *config.Config,
28-
op table.Operation,
28+
op func(ctx context.Context, s *Session) error,
2929
onAttempt func(err error),
3030
opts ...retry.Option,
3131
) (err error) {
3232
return retryBackoff(ctx, pool,
33-
func(ctx context.Context, s table.Session) (err error) {
33+
func(ctx context.Context, s *Session) (err error) {
3434
defer func() {
3535
if onAttempt != nil {
3636
onAttempt(err)
@@ -61,13 +61,11 @@ func do(
6161
func retryBackoff(
6262
ctx context.Context,
6363
pool sessionPool,
64-
op table.Operation,
64+
op func(ctx context.Context, s *Session) error,
6565
opts ...retry.Option,
6666
) error {
67-
return pool.With(ctx, func(ctx context.Context, s *Session) error {
67+
return pool.With(ctx, func(ctx context.Context, s *Session) (err error) {
6868
if err := op(ctx, s); err != nil {
69-
s.checkError(err)
70-
7169
return xerrors.WithStackTrace(err)
7270
}
7371

0 commit comments

Comments
 (0)