From 55aa1c067afa328a70d4ad88bc88f40eae3694cb Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Fri, 2 Aug 2024 19:57:07 +0300 Subject: [PATCH 1/3] added simple wrapper for grpc errors for getting nodeID and address --- internal/conn/conn.go | 4 ++-- internal/conn/errors.go | 31 +++++++++++++++++++++++++++++ internal/conn/errors_test.go | 18 +++++++++++++++++ internal/conn/grpc_client_stream.go | 6 +++--- internal/xerrors/operation.go | 8 ++++++++ internal/xerrors/transport.go | 12 +++++++++-- 6 files changed, 72 insertions(+), 7 deletions(-) diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 4a14b776f..8acbecd64 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -380,7 +380,7 @@ func invoke( defer onTransportError(ctx, err) if !useWrapping { - return opID, issues, err + return opID, issues, withConnInfo(err, nodeID, address) } if sentMark.canRetry() { @@ -545,7 +545,7 @@ func (c *conn) NewStream( }() if !useWrapping { - return nil, err + return nil, withConnInfo(err, c.NodeID(), c.Address()) } if sentMark.canRetry() { diff --git a/internal/conn/errors.go b/internal/conn/errors.go index 090dc4543..874a8aa39 100644 --- a/internal/conn/errors.go +++ b/internal/conn/errors.go @@ -38,3 +38,34 @@ func IsBadConn(err error, goodConnCodes ...grpcCodes.Code) bool { return true } + +type grpcError struct { + err error + + nodeID uint32 + address string +} + +func (e *grpcError) Error() string { + return e.err.Error() +} + +func (e *grpcError) As(target any) bool { + return xerrors.As(e.err, target) +} + +func (e *grpcError) NodeID() uint32 { + return e.nodeID +} + +func (e *grpcError) Address() string { + return e.address +} + +func withConnInfo(err error, nodeID uint32, address string) error { + return &grpcError{ + err: err, + nodeID: nodeID, + address: address, + } +} diff --git a/internal/conn/errors_test.go b/internal/conn/errors_test.go index dd6b4871a..20494157a 100644 --- a/internal/conn/errors_test.go +++ b/internal/conn/errors_test.go @@ -108,3 +108,21 @@ func TestIsBadConn(t *testing.T) { }) } } + +func TestGrpcError(t *testing.T) { + err := withConnInfo(grpcStatus.Error(grpcCodes.Unavailable, "test"), 123, "test:123") + require.Equal(t, `rpc error: code = Unavailable desc = test`, err.Error()) + var nodeID interface { + NodeID() uint32 + } + require.ErrorAs(t, err, &nodeID) + require.Equal(t, uint32(123), nodeID.NodeID()) + var address interface { + Address() string + } + require.ErrorAs(t, err, &address) + require.Equal(t, "test:123", address.Address()) + s, has := grpcStatus.FromError(err) + require.True(t, has) + require.Equal(t, grpcCodes.Unavailable, s.Code()) +} diff --git a/internal/conn/grpc_client_stream.go b/internal/conn/grpc_client_stream.go index 87c49658c..e718d62bb 100644 --- a/internal/conn/grpc_client_stream.go +++ b/internal/conn/grpc_client_stream.go @@ -58,7 +58,7 @@ func (s *grpcClientStream) CloseSend() (err error) { } if !s.wrapping { - return err + return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address()) } return xerrors.WithStackTrace(xerrors.Transport( @@ -97,7 +97,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { }() if !s.wrapping { - return err + return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address()) } if s.sentMark.canRetry() { @@ -156,7 +156,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { //nolint:funlen }() if !s.wrapping { - return err + return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address()) } if s.sentMark.canRetry() { diff --git a/internal/xerrors/operation.go b/internal/xerrors/operation.go index bc0e9e997..2b957811d 100644 --- a/internal/xerrors/operation.go +++ b/internal/xerrors/operation.go @@ -28,6 +28,14 @@ func (e *operationError) Code() int32 { return int32(e.code) } +func (e *operationError) NodeID() uint32 { + return e.nodeID +} + +func (e *operationError) Address() string { + return e.address +} + func (e *operationError) Name() string { return "operation/" + e.code.String() } diff --git a/internal/xerrors/transport.go b/internal/xerrors/transport.go index 8700652b1..16776df02 100644 --- a/internal/xerrors/transport.go +++ b/internal/xerrors/transport.go @@ -27,6 +27,14 @@ func (e *transportError) GRPCStatus() *grpcStatus.Status { func (e *transportError) isYdbError() {} +func (e *transportError) NodeID() uint32 { + return e.nodeID +} + +func (e *transportError) Address() string { + return e.address +} + func (e *transportError) Code() int32 { return int32(e.status.Code()) } @@ -134,8 +142,8 @@ func IsTransportError(err error, codes ...grpcCodes.Code) bool { var status *grpcStatus.Status if t := (*transportError)(nil); errors.As(err, &t) { status = t.status - } else if t, has := grpcStatus.FromError(err); has { - status = t + } else if s, has := grpcStatus.FromError(err); has { + status = s } if status != nil { if len(codes) == 0 { From da9f66b6cf4e80fdd971e0685e996717824542db Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 4 Feb 2025 21:26:00 +0300 Subject: [PATCH 2/3] fix --- CHANGELOG.md | 2 ++ internal/conn/errors.go | 29 +------------------------- internal/conn/errors_test.go | 10 ++++----- internal/xerrors/operation_test.go | 33 +++++++++++++++++++++++++++--- internal/xerrors/transport_test.go | 12 +++++++++-- 5 files changed, 48 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e78455c9..a02a971d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added nodeID and address to operation and transport errors + ## v3.99.4 * Fixed bug with wrong context on session closing * Fixed goroutine leak on closing `database/sql` driver diff --git a/internal/conn/errors.go b/internal/conn/errors.go index 874a8aa39..fbe610713 100644 --- a/internal/conn/errors.go +++ b/internal/conn/errors.go @@ -39,33 +39,6 @@ func IsBadConn(err error, goodConnCodes ...grpcCodes.Code) bool { return true } -type grpcError struct { - err error - - nodeID uint32 - address string -} - -func (e *grpcError) Error() string { - return e.err.Error() -} - -func (e *grpcError) As(target any) bool { - return xerrors.As(e.err, target) -} - -func (e *grpcError) NodeID() uint32 { - return e.nodeID -} - -func (e *grpcError) Address() string { - return e.address -} - func withConnInfo(err error, nodeID uint32, address string) error { - return &grpcError{ - err: err, - nodeID: nodeID, - address: address, - } + return xerrors.Transport(err, xerrors.WithNodeID(nodeID), xerrors.WithAddress(address)) } diff --git a/internal/conn/errors_test.go b/internal/conn/errors_test.go index 20494157a..118c99bec 100644 --- a/internal/conn/errors_test.go +++ b/internal/conn/errors_test.go @@ -109,19 +109,19 @@ func TestIsBadConn(t *testing.T) { } } -func TestGrpcError(t *testing.T) { - err := withConnInfo(grpcStatus.Error(grpcCodes.Unavailable, "test"), 123, "test:123") - require.Equal(t, `rpc error: code = Unavailable desc = test`, err.Error()) +func TestWithConnInfo(t *testing.T) { + err := withConnInfo(grpcStatus.Error(grpcCodes.Unavailable, "test"), 100500, "example.com:2135") + require.ErrorIs(t, err, grpcStatus.Error(grpcCodes.Unavailable, "test")) var nodeID interface { NodeID() uint32 } require.ErrorAs(t, err, &nodeID) - require.Equal(t, uint32(123), nodeID.NodeID()) + require.Equal(t, uint32(100500), nodeID.NodeID()) var address interface { Address() string } require.ErrorAs(t, err, &address) - require.Equal(t, "test:123", address.Address()) + require.Equal(t, "example.com:2135", address.Address()) s, has := grpcStatus.FromError(err) require.True(t, has) require.Equal(t, grpcCodes.Unavailable, s.Code()) diff --git a/internal/xerrors/operation_test.go b/internal/xerrors/operation_test.go index 41d735957..6de4fadb0 100644 --- a/internal/xerrors/operation_test.go +++ b/internal/xerrors/operation_test.go @@ -7,24 +7,30 @@ import ( "github.com/stretchr/testify/require" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" ) func TestIsOperationError(t *testing.T) { for _, tt := range []struct { + name string err error codes []Ydb.StatusIds_StatusCode match bool }{ // check only operation error with any ydb status code { + name: xtest.CurrentFileLine(), err: &operationError{code: Ydb.StatusIds_BAD_REQUEST}, match: true, }, { + name: xtest.CurrentFileLine(), err: fmt.Errorf("wrapped: %w", &operationError{code: Ydb.StatusIds_BAD_REQUEST}), match: true, }, { + name: xtest.CurrentFileLine(), err: Join( fmt.Errorf("test"), &operationError{code: Ydb.StatusIds_BAD_REQUEST}, @@ -34,16 +40,19 @@ func TestIsOperationError(t *testing.T) { }, // match ydb status code { + name: xtest.CurrentFileLine(), err: &operationError{code: Ydb.StatusIds_BAD_REQUEST}, codes: []Ydb.StatusIds_StatusCode{Ydb.StatusIds_BAD_REQUEST}, match: true, }, { + name: xtest.CurrentFileLine(), err: fmt.Errorf("wrapped: %w", &operationError{code: Ydb.StatusIds_BAD_REQUEST}), codes: []Ydb.StatusIds_StatusCode{Ydb.StatusIds_BAD_REQUEST}, match: true, }, { + name: xtest.CurrentFileLine(), err: Join( fmt.Errorf("test"), &operationError{code: Ydb.StatusIds_BAD_REQUEST}, @@ -54,16 +63,19 @@ func TestIsOperationError(t *testing.T) { }, // no match ydb status code { + name: xtest.CurrentFileLine(), err: &operationError{code: Ydb.StatusIds_BAD_REQUEST}, codes: []Ydb.StatusIds_StatusCode{Ydb.StatusIds_ABORTED}, match: false, }, { + name: xtest.CurrentFileLine(), err: fmt.Errorf("wrapped: %w", &operationError{code: Ydb.StatusIds_BAD_REQUEST}), codes: []Ydb.StatusIds_StatusCode{Ydb.StatusIds_ABORTED}, match: false, }, { + name: xtest.CurrentFileLine(), err: Join( fmt.Errorf("test"), &operationError{code: Ydb.StatusIds_BAD_REQUEST}, @@ -73,7 +85,7 @@ func TestIsOperationError(t *testing.T) { match: false, }, } { - t.Run("", func(t *testing.T) { + t.Run(tt.name, func(t *testing.T) { require.Equal(t, tt.match, IsOperationError(tt.err, tt.codes...)) }) } @@ -81,10 +93,12 @@ func TestIsOperationError(t *testing.T) { func TestIsOperationErrorTransactionLocksInvalidated(t *testing.T) { for _, tt := range [...]struct { + name string err error isTLI bool }{ { + name: xtest.CurrentFileLine(), err: Operation( WithStatusCode(Ydb.StatusIds_ABORTED), WithIssues([]*Ydb_Issue.IssueMessage{{ @@ -94,6 +108,7 @@ func TestIsOperationErrorTransactionLocksInvalidated(t *testing.T) { isTLI: true, }, { + name: xtest.CurrentFileLine(), err: Operation( WithStatusCode(Ydb.StatusIds_OVERLOADED), WithIssues([]*Ydb_Issue.IssueMessage{{ @@ -103,12 +118,14 @@ func TestIsOperationErrorTransactionLocksInvalidated(t *testing.T) { isTLI: false, }, { + name: xtest.CurrentFileLine(), err: Operation( WithStatusCode(Ydb.StatusIds_ABORTED), ), isTLI: false, }, { + name: xtest.CurrentFileLine(), err: Operation( WithStatusCode(Ydb.StatusIds_ABORTED), WithIssues([]*Ydb_Issue.IssueMessage{{ @@ -120,7 +137,7 @@ func TestIsOperationErrorTransactionLocksInvalidated(t *testing.T) { isTLI: true, }, } { - t.Run("", func(t *testing.T) { + t.Run(tt.name, func(t *testing.T) { require.Equal(t, tt.isTLI, IsOperationErrorTransactionLocksInvalidated(tt.err)) }) } @@ -128,22 +145,32 @@ func TestIsOperationErrorTransactionLocksInvalidated(t *testing.T) { func Test_operationError_Error(t *testing.T) { for _, tt := range []struct { + name string err error text string }{ { + name: xtest.CurrentFileLine(), err: Operation(WithStatusCode(Ydb.StatusIds_BAD_REQUEST), WithAddress("localhost")), text: "operation/BAD_REQUEST (code = 400010, address = localhost)", }, { + name: xtest.CurrentFileLine(), + err: Operation(WithStatusCode(Ydb.StatusIds_BAD_REQUEST), WithNodeID(100500)), + text: "operation/BAD_REQUEST (code = 400010, nodeID = 100500)", + }, + { + name: xtest.CurrentFileLine(), err: Operation(WithStatusCode(Ydb.StatusIds_BAD_REQUEST)), text: "operation/BAD_REQUEST (code = 400010)", }, { + name: xtest.CurrentFileLine(), err: Operation(WithStatusCode(Ydb.StatusIds_BAD_SESSION)), text: "operation/BAD_SESSION (code = 400100)", }, { + name: xtest.CurrentFileLine(), err: Operation(WithStatusCode(Ydb.StatusIds_PRECONDITION_FAILED), WithIssues([]*Ydb_Issue.IssueMessage{ { Message: "issue one", @@ -177,7 +204,7 @@ func Test_operationError_Error(t *testing.T) { text: "operation/PRECONDITION_FAILED (code = 400120, issues = [{15:3 => #1 'issue one'},{#2 'issue two' [{test.yql:16:4 => #3 'issue three'},{#4 'issue four'}]}])", //nolint:lll }, } { - t.Run("", func(t *testing.T) { + t.Run(tt.name, func(t *testing.T) { require.Equal(t, tt.text, tt.err.Error()) }) } diff --git a/internal/xerrors/transport_test.go b/internal/xerrors/transport_test.go index bfb81fa86..4ec468ab1 100644 --- a/internal/xerrors/transport_test.go +++ b/internal/xerrors/transport_test.go @@ -145,19 +145,27 @@ func TestGrpcError(t *testing.T) { func TestTransportErrorString(t *testing.T) { for _, tt := range []struct { + name string err error text string }{ { + name: xtest.CurrentFileLine(), err: Transport(grpcStatus.Error(grpcCodes.FailedPrecondition, "")), text: "transport/FailedPrecondition (code = 9, source error = \"rpc error: code = FailedPrecondition desc = \")", }, { + name: xtest.CurrentFileLine(), err: Transport(grpcStatus.Error(grpcCodes.Unavailable, ""), WithAddress("localhost:2135")), text: "transport/Unavailable (code = 14, source error = \"rpc error: code = Unavailable desc = \", address: \"localhost:2135\")", //nolint:lll }, + { + name: xtest.CurrentFileLine(), + err: Transport(grpcStatus.Error(grpcCodes.Unavailable, ""), WithNodeID(100500)), + text: "transport/Unavailable (code = 14, source error = \"rpc error: code = Unavailable desc = \", nodeID = 100500)", //nolint:lll + }, } { - t.Run("", func(t *testing.T) { + t.Run(tt.name, func(t *testing.T) { require.Equal(t, tt.text, tt.err.Error()) }) } @@ -189,7 +197,7 @@ func TestTransportErrorName(t *testing.T) { name: "transport/Aborted", }, } { - t.Run("", func(t *testing.T) { + t.Run(tt.name, func(t *testing.T) { if tt.err == nil { require.Nil(t, TransportError(tt.err)) //nolint:testifylint } else { From 5effd5bc40f93b33d57f4e15c9a5144b6dcb1af4 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 6 Feb 2025 10:29:51 +0300 Subject: [PATCH 3/3] Apply suggestions from code review --- internal/conn/conn.go | 4 ++-- internal/conn/errors.go | 4 ---- internal/conn/errors_test.go | 18 ------------------ internal/conn/grpc_client_stream.go | 6 +++--- 4 files changed, 5 insertions(+), 27 deletions(-) diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 8acbecd64..4a14b776f 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -380,7 +380,7 @@ func invoke( defer onTransportError(ctx, err) if !useWrapping { - return opID, issues, withConnInfo(err, nodeID, address) + return opID, issues, err } if sentMark.canRetry() { @@ -545,7 +545,7 @@ func (c *conn) NewStream( }() if !useWrapping { - return nil, withConnInfo(err, c.NodeID(), c.Address()) + return nil, err } if sentMark.canRetry() { diff --git a/internal/conn/errors.go b/internal/conn/errors.go index fbe610713..090dc4543 100644 --- a/internal/conn/errors.go +++ b/internal/conn/errors.go @@ -38,7 +38,3 @@ func IsBadConn(err error, goodConnCodes ...grpcCodes.Code) bool { return true } - -func withConnInfo(err error, nodeID uint32, address string) error { - return xerrors.Transport(err, xerrors.WithNodeID(nodeID), xerrors.WithAddress(address)) -} diff --git a/internal/conn/errors_test.go b/internal/conn/errors_test.go index 118c99bec..dd6b4871a 100644 --- a/internal/conn/errors_test.go +++ b/internal/conn/errors_test.go @@ -108,21 +108,3 @@ func TestIsBadConn(t *testing.T) { }) } } - -func TestWithConnInfo(t *testing.T) { - err := withConnInfo(grpcStatus.Error(grpcCodes.Unavailable, "test"), 100500, "example.com:2135") - require.ErrorIs(t, err, grpcStatus.Error(grpcCodes.Unavailable, "test")) - var nodeID interface { - NodeID() uint32 - } - require.ErrorAs(t, err, &nodeID) - require.Equal(t, uint32(100500), nodeID.NodeID()) - var address interface { - Address() string - } - require.ErrorAs(t, err, &address) - require.Equal(t, "example.com:2135", address.Address()) - s, has := grpcStatus.FromError(err) - require.True(t, has) - require.Equal(t, grpcCodes.Unavailable, s.Code()) -} diff --git a/internal/conn/grpc_client_stream.go b/internal/conn/grpc_client_stream.go index e718d62bb..87c49658c 100644 --- a/internal/conn/grpc_client_stream.go +++ b/internal/conn/grpc_client_stream.go @@ -58,7 +58,7 @@ func (s *grpcClientStream) CloseSend() (err error) { } if !s.wrapping { - return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address()) + return err } return xerrors.WithStackTrace(xerrors.Transport( @@ -97,7 +97,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { }() if !s.wrapping { - return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address()) + return err } if s.sentMark.canRetry() { @@ -156,7 +156,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { //nolint:funlen }() if !s.wrapping { - return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address()) + return err } if s.sentMark.canRetry() {