Skip to content

Commit b837471

Browse files
author
Gleb Brozhe
committed
merging with main
1 parent 148de8d commit b837471

File tree

2 files changed

+76
-87
lines changed

2 files changed

+76
-87
lines changed

internal/balancer/balancer.go

Lines changed: 29 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
9797
address = "ydb:///" + b.driverConfig.Endpoint()
9898
onDone = trace.DriverOnBalancerClusterDiscoveryAttempt(
9999
b.driverConfig.Trace(), &ctx,
100-
stack.FunctionID(""),
100+
stack.FunctionID(
101+
"github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).clusterDiscoveryAttempt"),
101102
address,
102103
)
103104
endpoints []endpoint.Endpoint
@@ -173,7 +174,8 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
173174
var (
174175
onDone = trace.DriverOnBalancerUpdate(
175176
b.driverConfig.Trace(), &ctx,
176-
stack.FunctionID(""),
177+
stack.FunctionID(
178+
"github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).applyDiscoveredEndpoints"),
177179
b.config.DetectLocalDC,
178180
)
179181
previousConns []conn.Conn
@@ -211,7 +213,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
211213
func (b *Balancer) Close(ctx context.Context) (err error) {
212214
onDone := trace.DriverOnBalancerClose(
213215
b.driverConfig.Trace(), &ctx,
214-
stack.FunctionID(""),
216+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).Close"),
215217
)
216218
defer func() {
217219
onDone(err)
@@ -234,10 +236,19 @@ func New(
234236
pool *conn.Pool,
235237
opts ...discoveryConfig.Option,
236238
) (b *Balancer, finalErr error) {
237-
onDone := trace.DriverOnBalancerInit(
238-
driverConfig.Trace(), &ctx,
239-
stack.FunctionID(""),
240-
driverConfig.Balancer().String(),
239+
var (
240+
onDone = trace.DriverOnBalancerInit(
241+
driverConfig.Trace(), &ctx,
242+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.New"),
243+
driverConfig.Balancer().String(),
244+
)
245+
discoveryConfig = discoveryConfig.New(append(opts,
246+
discoveryConfig.With(driverConfig.Common),
247+
discoveryConfig.WithEndpoint(driverConfig.Endpoint()),
248+
discoveryConfig.WithDatabase(driverConfig.Database()),
249+
discoveryConfig.WithSecure(driverConfig.Secure()),
250+
discoveryConfig.WithMeta(driverConfig.Meta()),
251+
)...)
241252
)
242253
defer func() {
243254
onDone(finalErr)
@@ -248,43 +259,28 @@ func New(
248259
pool: pool,
249260
localDCDetector: detectLocalDC,
250261
}
262+
d := internalDiscovery.New(ctx, pool.Get(
263+
endpoint.New(driverConfig.Endpoint()),
264+
), discoveryConfig)
265+
266+
b.discoveryClient = d
251267

252268
if config := driverConfig.Balancer(); config == nil {
253269
b.config = balancerConfig.Config{}
254270
} else {
255271
b.config = *config
256272
}
257273

258-
err := startDiscovery(ctx, b, opts)
259-
if err != nil {
260-
return nil, err
261-
}
262-
263-
return b, nil
264-
}
265-
266-
func startDiscovery(ctx context.Context, b *Balancer, opts []discoveryConfig.Option) error {
267-
discoveryConfig := discoveryConfig.New(append(opts,
268-
discoveryConfig.With(b.driverConfig.Common),
269-
discoveryConfig.WithEndpoint(b.driverConfig.Endpoint()),
270-
discoveryConfig.WithDatabase(b.driverConfig.Database()),
271-
discoveryConfig.WithSecure(b.driverConfig.Secure()),
272-
discoveryConfig.WithMeta(b.driverConfig.Meta()),
273-
)...)
274-
d := internalDiscovery.New(ctx, b.pool.Get(
275-
endpoint.New(b.driverConfig.Endpoint()),
276-
), discoveryConfig)
277-
278-
b.discoveryClient = d
279-
280274
if b.config.SingleConn {
281275
b.applyDiscoveredEndpoints(ctx, []endpoint.Endpoint{
282-
endpoint.New(b.driverConfig.Endpoint()),
276+
endpoint.New(driverConfig.Endpoint()),
283277
}, "")
284278
} else {
279+
// initialization of balancer state
285280
if err := b.clusterDiscovery(ctx); err != nil {
286-
return err
281+
return nil, xerrors.WithStackTrace(err)
287282
}
283+
// run background discovering
288284
if d := discoveryConfig.Interval(); d > 0 {
289285
b.discoveryRepeater = repeater.New(xcontext.ValueOnly(ctx),
290286
d, b.clusterDiscoveryAttempt,
@@ -294,7 +290,7 @@ func startDiscovery(ctx context.Context, b *Balancer, opts []discoveryConfig.Opt
294290
}
295291
}
296292

297-
return nil
293+
return b, nil
298294
}
299295

300296
func (b *Balancer) Invoke(
@@ -377,7 +373,7 @@ func (b *Balancer) connections() *connectionsState {
377373
func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
378374
onDone := trace.DriverOnBalancerChooseEndpoint(
379375
b.driverConfig.Trace(), &ctx,
380-
stack.FunctionID(""),
376+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).getConn"),
381377
)
382378
defer func() {
383379
if err == nil {

internal/scripting/client.go

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (c *Client) execute(
7272
) (r result.Result, err error) {
7373
var (
7474
onDone = trace.ScriptingOnExecute(c.config.Trace(), &ctx,
75-
stack.FunctionID(""),
75+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/scripting.(*Client).execute"),
7676
query, parameters,
7777
)
7878
a = allocator.New()
@@ -151,7 +151,7 @@ func (c *Client) explain(
151151
) (e table.ScriptingYQLExplanation, err error) {
152152
var (
153153
onDone = trace.ScriptingOnExplain(c.config.Trace(), &ctx,
154-
stack.FunctionID(""),
154+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/scripting.(*Client).explain"),
155155
query,
156156
)
157157
request = &Ydb_Scripting.ExplainYqlRequest{
@@ -218,53 +218,26 @@ func (c *Client) StreamExecute(
218218
return r, xerrors.WithStackTrace(err)
219219
}
220220

221-
func newReceiveStreamResultFunc(
222-
stream Ydb_Scripting_V1.ScriptingService_StreamExecuteYqlClient,
223-
traceErr func(error) func(error),
224-
) func(ctx context.Context) (*Ydb.ResultSet, *Ydb_TableStats.QueryStats, error) {
225-
return func(ctx context.Context) (*Ydb.ResultSet, *Ydb_TableStats.QueryStats, error) {
226-
defer func() {
227-
err := recover()
228-
if err != nil {
229-
errError := err.(error)
230-
traceErr(xerrors.HideEOF(errError))
231-
}
232-
}()
233-
select {
234-
case <-ctx.Done():
235-
return nil, nil, xerrors.WithStackTrace(ctx.Err())
236-
default:
237-
var response *Ydb_Scripting.ExecuteYqlPartialResponse
238-
response, err := stream.Recv()
239-
if err != nil {
240-
return nil, nil, xerrors.WithStackTrace(err)
241-
}
242-
243-
result := response.GetResult()
244-
if result == nil {
245-
return nil, nil, xerrors.WithStackTrace(errors.New("no result set"))
246-
}
247-
248-
return result.GetResultSet(), result.GetQueryStats(), nil
249-
}
250-
}
251-
}
252-
253221
func (c *Client) streamExecute(
254222
ctx context.Context,
255223
query string,
256224
parameters *params.Parameters,
257-
) (r scanner.StreamResult, err error) {
225+
) (r result.StreamResult, err error) {
258226
var (
259-
onIntermediate = trace.ScriptingOnStreamExecute(c.config.Trace(), &ctx, stack.FunctionID(""), query, parameters)
260-
a = allocator.New()
261-
request = &Ydb_Scripting.ExecuteYqlRequest{
227+
onIntermediate = trace.ScriptingOnStreamExecute(c.config.Trace(), &ctx,
228+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/scripting.(*Client).streamExecute"),
229+
query, parameters,
230+
)
231+
a = allocator.New()
232+
request = &Ydb_Scripting.ExecuteYqlRequest{
262233
Script: query,
263234
Parameters: parameters.ToYDB(a),
264235
OperationParams: operation.Params(
265-
ctx, c.config.OperationTimeout(),
236+
ctx,
237+
c.config.OperationTimeout(),
266238
c.config.OperationCancelAfter(),
267-
operation.ModeSync),
239+
operation.ModeSync,
240+
),
268241
}
269242
)
270243
defer func() {
@@ -275,33 +248,53 @@ func (c *Client) streamExecute(
275248
}()
276249

277250
ctx, cancel := xcontext.WithCancel(ctx)
278-
defer cancel()
279251

280252
stream, err := c.service.StreamExecuteYql(ctx, request)
281253
if err != nil {
282-
return nil, xerrors.WithStackTrace(err)
283-
}
284-
285-
recvFunc := newReceiveStreamResultFunc(stream, onIntermediate)
286-
onCloseFunc := func(err error) error {
287-
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
288-
289-
return err
290-
}
254+
cancel()
291255

292-
result, err := scanner.NewStream(ctx, recvFunc, onCloseFunc)
293-
if err != nil {
294256
return nil, xerrors.WithStackTrace(err)
295257
}
296258

297-
return result, nil
259+
return scanner.NewStream(ctx,
260+
func(ctx context.Context) (
261+
set *Ydb.ResultSet,
262+
stats *Ydb_TableStats.QueryStats,
263+
err error,
264+
) {
265+
defer func() {
266+
onIntermediate(xerrors.HideEOF(err))
267+
}()
268+
select {
269+
case <-ctx.Done():
270+
return nil, nil, xerrors.WithStackTrace(ctx.Err())
271+
default:
272+
var response *Ydb_Scripting.ExecuteYqlPartialResponse
273+
response, err = stream.Recv()
274+
result := response.GetResult()
275+
if result == nil || err != nil {
276+
return nil, nil, xerrors.WithStackTrace(err)
277+
}
278+
279+
return result.GetResultSet(), result.GetQueryStats(), nil
280+
}
281+
},
282+
func(err error) error {
283+
cancel()
284+
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
285+
286+
return err
287+
},
288+
)
298289
}
299290

300291
func (c *Client) Close(ctx context.Context) (err error) {
301292
if c == nil {
302293
return xerrors.WithStackTrace(errNilClient)
303294
}
304-
onDone := trace.ScriptingOnClose(c.config.Trace(), &ctx, stack.FunctionID(""))
295+
onDone := trace.ScriptingOnClose(c.config.Trace(), &ctx,
296+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/scripting.(*Client).Close"),
297+
)
305298
defer func() {
306299
onDone(err)
307300
}()

0 commit comments

Comments
 (0)