Skip to content

Commit 5c579a7

Browse files
committed
feat: allow different formats for messages streaming/unary
Streaming response are never aggregated, so wrapping messages into `repeated` container looks like overhead as there will always be just a single entry in `repeated` container. Pass `streaming` flag down to `Backend` response building methods to support this flow. Test service was adjusted with the new proto layout. Signed-off-by: Andrey Smirnov <[email protected]>
1 parent 6c9f7b3 commit 5c579a7

File tree

7 files changed

+120
-145
lines changed

7 files changed

+120
-145
lines changed

proxy/DOC.md

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,25 +84,27 @@ type Backend interface {
8484

8585
// AppendInfo is called to enhance response from the backend with additional data.
8686
//
87+
// Parameter streaming indicates if response is delivered in streaming mode or not.
88+
//
8789
// Usecase might be appending backend endpoint (or name) to the protobuf serialized response, so that response is enhanced
8890
// with source information. This is particularly important for one to many calls, when it is required to identify
8991
// response from each of the backends participating in the proxying.
9092
//
9193
// If not additional proxying is required, simply returning the buffer without changes works fine.
92-
AppendInfo(resp []byte) ([]byte, error)
94+
AppendInfo(streaming bool, resp []byte) ([]byte, error)
9395

9496
// BuildError is called to convert error from upstream into response field.
9597
//
9698
// BuildError is never called for one to one proxying, in that case all the errors are returned back to the caller
97-
// as grpc errors.
99+
// as grpc errors. Parameter streaming indicates if response is delivered in streaming mode or not.
98100
//
99101
// When proxying one to many, if one the requests fails or upstream returns an error, it is undesirable to fail the whole
100102
// request and discard responses from other backends. BuildError converts (marshals) error from backend into protobuf encoded
101103
// response which is analyzed by the caller, so that caller reaching out to N upstreams receives N1 successful responses and
102104
// N2 error responses so that N1 + N2 == N.
103105
//
104106
// If BuildError returns nil, error is returned as grpc error (failing whole request).
105-
BuildError(err error) ([]byte, error)
107+
BuildError(streaming bool, err error) ([]byte, error)
106108
}
107109
```
108110

@@ -148,15 +150,6 @@ func WithMethodNames(methodNames ...string) Option
148150
WithMethodNames configures list of method names to proxy for non-transparent
149151
handler.
150152

151-
#### func WithMode
152-
153-
```go
154-
func WithMode(mode Mode) Option
155-
```
156-
WithMode sets proxying mode: One2One or One2Many.
157-
158-
Default mode is One2One.
159-
160153
#### func WithStreamedDetector
161154

162155
```go
@@ -258,14 +251,14 @@ for one to one proxying.
258251
#### func (*SingleBackend) AppendInfo
259252

260253
```go
261-
func (sb *SingleBackend) AppendInfo(resp []byte) ([]byte, error)
254+
func (sb *SingleBackend) AppendInfo(streaming bool, resp []byte) ([]byte, error)
262255
```
263256
AppendInfo is called to enhance response from the backend with additional data.
264257

265258
#### func (*SingleBackend) BuildError
266259

267260
```go
268-
func (sb *SingleBackend) BuildError(err error) ([]byte, error)
261+
func (sb *SingleBackend) BuildError(streaming bool, err error) ([]byte, error)
269262
```
270263
BuildError is called to convert error from upstream into response field.
271264

@@ -285,7 +278,7 @@ func (sb *SingleBackend) String() string
285278
#### type StreamDirector
286279

287280
```go
288-
type StreamDirector func(ctx context.Context, fullMethodName string) ([]Backend, error)
281+
type StreamDirector func(ctx context.Context, fullMethodName string) (Mode, []Backend, error)
289282
```
290283

291284
StreamDirector returns a list of Backend objects to forward the call to.

proxy/director.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,27 @@ type Backend interface {
3131

3232
// AppendInfo is called to enhance response from the backend with additional data.
3333
//
34+
// Parameter streaming indicates if response is delivered in streaming mode or not.
35+
//
3436
// Usecase might be appending backend endpoint (or name) to the protobuf serialized response, so that response is enhanced
3537
// with source information. This is particularly important for one to many calls, when it is required to identify
3638
// response from each of the backends participating in the proxying.
3739
//
3840
// If not additional proxying is required, simply returning the buffer without changes works fine.
39-
AppendInfo(resp []byte) ([]byte, error)
41+
AppendInfo(streaming bool, resp []byte) ([]byte, error)
4042

4143
// BuildError is called to convert error from upstream into response field.
4244
//
4345
// BuildError is never called for one to one proxying, in that case all the errors are returned back to the caller
44-
// as grpc errors.
46+
// as grpc errors. Parameter streaming indicates if response is delivered in streaming mode or not.
4547
//
4648
// When proxying one to many, if one the requests fails or upstream returns an error, it is undesirable to fail the whole
4749
// request and discard responses from other backends. BuildError converts (marshals) error from backend into protobuf encoded
4850
// response which is analyzed by the caller, so that caller reaching out to N upstreams receives N1 successful responses and
4951
// N2 error responses so that N1 + N2 == N.
5052
//
5153
// If BuildError returns nil, error is returned as grpc error (failing whole request).
52-
BuildError(err error) ([]byte, error)
54+
BuildError(streaming bool, err error) ([]byte, error)
5355
}
5456

5557
// SingleBackend implements a simple wrapper around get connection function of one to one proxying.
@@ -74,12 +76,12 @@ func (sb *SingleBackend) GetConnection(ctx context.Context) (context.Context, *g
7476
}
7577

7678
// AppendInfo is called to enhance response from the backend with additional data.
77-
func (sb *SingleBackend) AppendInfo(resp []byte) ([]byte, error) {
79+
func (sb *SingleBackend) AppendInfo(streaming bool, resp []byte) ([]byte, error) {
7880
return resp, nil
7981
}
8082

8183
// BuildError is called to convert error from upstream into response field.
82-
func (sb *SingleBackend) BuildError(err error) ([]byte, error) {
84+
func (sb *SingleBackend) BuildError(streaming bool, err error) ([]byte, error) {
8385
return nil, nil
8486
}
8587

proxy/handler_one2many.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ func (s *handler) handlerOne2Many(fullMethodName string, serverStream grpc.Serve
5858
}
5959

6060
// formatError tries to format error from upstream as message to the client
61-
func (s *handler) formatError(src *backendConnection, backendErr error) ([]byte, error) {
62-
payload, err := src.backend.BuildError(backendErr)
61+
func (s *handler) formatError(streaming bool, src *backendConnection, backendErr error) ([]byte, error) {
62+
payload, err := src.backend.BuildError(streaming, backendErr)
6363
if err != nil {
6464
return nil, fmt.Errorf("error building error for %s: %w", src.backend, err)
6565
}
@@ -76,7 +76,7 @@ func (s *handler) formatError(src *backendConnection, backendErr error) ([]byte,
7676
// if sendError fails to deliver the error, error is returned
7777
// if sendError successfully delivers the error, nil is returned
7878
func (s *handler) sendError(src *backendConnection, dst grpc.ServerStream, backendErr error) error {
79-
payload, err := s.formatError(src, backendErr)
79+
payload, err := s.formatError(true, src, backendErr)
8080
if err != nil {
8181
return err
8282
}
@@ -100,7 +100,7 @@ func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection,
100100
go func(src *backendConnection) {
101101
errCh <- func() error {
102102
if src.connError != nil {
103-
payload, err := s.formatError(src, src.connError)
103+
payload, err := s.formatError(false, src, src.connError)
104104
if err != nil {
105105
return err
106106
}
@@ -120,7 +120,7 @@ func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection,
120120
return nil
121121
}
122122

123-
payload, err := s.formatError(src, err)
123+
payload, err := s.formatError(false, src, err)
124124
if err != nil {
125125
return err
126126
}
@@ -134,7 +134,7 @@ func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection,
134134
// This is the only place to do it nicely.
135135
md, err := src.clientStream.Header()
136136
if err != nil {
137-
payload, err := s.formatError(src, err)
137+
payload, err := s.formatError(false, src, err)
138138
if err != nil {
139139
return err
140140
}
@@ -149,7 +149,7 @@ func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection,
149149
}
150150

151151
var err error
152-
f.payload, err = src.backend.AppendInfo(f.payload)
152+
f.payload, err = src.backend.AppendInfo(false, f.payload)
153153
if err != nil {
154154
return fmt.Errorf("error appending info for %s: %w", src.backend, err)
155155
}
@@ -224,7 +224,7 @@ func (s *handler) forwardClientsToServerMultiStreaming(sources []backendConnecti
224224
}
225225

226226
var err error
227-
f.payload, err = src.backend.AppendInfo(f.payload)
227+
f.payload, err = src.backend.AppendInfo(true, f.payload)
228228
if err != nil {
229229
return fmt.Errorf("error appending info for %s: %w", src.backend, err)
230230
}

proxy/handler_one2many_test.go

Lines changed: 42 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,10 @@ func (s *assertingMultiService) PingList(ping *pb.PingRequest, stream pb.MultiSe
8282
// Send user trailers and headers.
8383
stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")) //nolint: errcheck
8484
for i := 0; i < countListResponses; i++ {
85-
stream.Send(&pb.MultiPingReply{ //nolint: errcheck
86-
Response: []*pb.MultiPingResponse{
87-
{
88-
Value: ping.Value,
89-
Counter: int32(i),
90-
Server: s.server,
91-
},
92-
},
85+
stream.Send(&pb.MultiPingResponse{ //nolint: errcheck
86+
Value: ping.Value,
87+
Counter: int32(i),
88+
Server: s.server,
9389
})
9490
}
9591
stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) //nolint: errcheck
@@ -107,14 +103,10 @@ func (s *assertingMultiService) PingStream(stream pb.MultiService_PingStreamServ
107103
require.NoError(s.t, err, "can't fail reading stream")
108104
return err
109105
}
110-
pong := &pb.MultiPingReply{
111-
Response: []*pb.MultiPingResponse{
112-
{
113-
Value: ping.Value,
114-
Counter: counter,
115-
Server: s.server,
116-
},
117-
},
106+
pong := &pb.MultiPingResponse{
107+
Value: ping.Value,
108+
Counter: counter,
109+
Server: s.server,
118110
}
119111
if err := stream.Send(pong); err != nil {
120112
require.NoError(s.t, err, "can't fail sending back a pong")
@@ -162,7 +154,17 @@ func (b *assertingBackend) GetConnection(ctx context.Context) (context.Context,
162154
return outCtx, b.conn, err
163155
}
164156

165-
func (b *assertingBackend) AppendInfo(resp []byte) ([]byte, error) {
157+
func (b *assertingBackend) AppendInfo(streaming bool, resp []byte) ([]byte, error) {
158+
payload, err := proto.Marshal(&pb.ResponseMetadataPrepender{
159+
Metadata: &pb.ResponseMetadata{
160+
Hostname: fmt.Sprintf("server%d", b.i),
161+
},
162+
})
163+
164+
if streaming {
165+
return append(resp, payload...), err
166+
}
167+
166168
// decode protobuf embedded header
167169
typ, n1 := proto.DecodeVarint(resp)
168170
_, n2 := proto.DecodeVarint(resp[n1:]) // length
@@ -171,12 +173,6 @@ func (b *assertingBackend) AppendInfo(resp []byte) ([]byte, error) {
171173
return nil, fmt.Errorf("unexpected message format: %d", typ)
172174
}
173175

174-
payload, err := proto.Marshal(&pb.ResponseMetadataPrepender{
175-
Metadata: &pb.ResponseMetadata{
176-
Hostname: fmt.Sprintf("server%d", b.i),
177-
},
178-
})
179-
180176
// cut off embedded message header
181177
resp = resp[n1+n2:]
182178
// build new embedded message header
@@ -186,8 +182,8 @@ func (b *assertingBackend) AppendInfo(resp []byte) ([]byte, error) {
186182
return append(resp, payload...), err
187183
}
188184

189-
func (b *assertingBackend) BuildError(err error) ([]byte, error) {
190-
return proto.Marshal(&pb.EmptyReply{
185+
func (b *assertingBackend) BuildError(streaming bool, err error) ([]byte, error) {
186+
resp := &pb.EmptyReply{
191187
Response: []*pb.EmptyResponse{
192188
{
193189
Metadata: &pb.ResponseMetadata{
@@ -196,7 +192,13 @@ func (b *assertingBackend) BuildError(err error) ([]byte, error) {
196192
},
197193
},
198194
},
199-
})
195+
}
196+
197+
if streaming {
198+
return proto.Marshal(resp.Response[0])
199+
}
200+
201+
return proto.Marshal(resp)
200202
}
201203

202204
type ProxyOne2ManySuite struct {
@@ -350,8 +352,7 @@ func (s *ProxyOne2ManySuite) TestPingStreamErrorPropagatesAppError() {
350352
resp, err := stream.Recv()
351353
s.Require().NoError(err)
352354

353-
s.Assert().Len(resp.Response, 1)
354-
s.Assert().Equal("rpc error: code = FailedPrecondition desc = Userspace error.", resp.Response[0].Metadata.UpstreamError)
355+
s.Assert().Equal("rpc error: code = FailedPrecondition desc = Userspace error.", resp.Metadata.UpstreamError)
355356
}
356357

357358
require.NoError(s.T(), stream.CloseSend(), "no error on close send")
@@ -373,8 +374,7 @@ func (s *ProxyOne2ManySuite) TestPingStreamConnError() {
373374
resp, err := stream.Recv()
374375
s.Require().NoError(err)
375376

376-
s.Assert().Len(resp.Response, 1)
377-
s.Assert().Equal("rpc error: code = Unavailable desc = backend connection failed", resp.Response[0].Metadata.UpstreamError)
377+
s.Assert().Equal("rpc error: code = Unavailable desc = backend connection failed", resp.Metadata.UpstreamError)
378378

379379
_, err = stream.Recv()
380380
require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaning OK")
@@ -407,11 +407,10 @@ func (s *ProxyOne2ManySuite) TestPingStream_FullDuplexWorks() {
407407
resp, err := stream.Recv()
408408
s.Require().NoError(err)
409409

410-
s.Assert().Len(resp.Response, 1)
411-
s.Assert().EqualValues(i, resp.Response[0].Counter, "ping roundtrip must succeed with the correct id")
412-
s.Assert().EqualValues(resp.Response[0].Metadata.Hostname, resp.Response[0].Server)
410+
s.Assert().EqualValues(i, resp.Counter, "ping roundtrip must succeed with the correct id")
411+
s.Assert().EqualValues(resp.Metadata.Hostname, resp.Server)
413412

414-
delete(expectedUpstreams, resp.Response[0].Metadata.Hostname)
413+
delete(expectedUpstreams, resp.Metadata.Hostname)
415414
}
416415

417416
s.Require().Empty(expectedUpstreams)
@@ -464,24 +463,24 @@ func (s *ProxyOne2ManySuite) TestPingStream_FullDuplexConcurrent() {
464463
return err
465464
}
466465

467-
if len(resp.Response) != 1 {
468-
return fmt.Errorf("single response expected: %d", len(resp.Response))
466+
if resp.Metadata == nil {
467+
return fmt.Errorf("response metadata expected: %v", resp)
469468
}
470469

471-
if resp.Response[0].Metadata.Hostname != resp.Response[0].Server {
472-
return fmt.Errorf("mismatch on host metadata: %v != %v", resp.Response[0].Metadata.Hostname, resp.Response[0].Server)
470+
if resp.Metadata.Hostname != resp.Server {
471+
return fmt.Errorf("mismatch on host metadata: %v != %v", resp.Metadata.Hostname, resp.Server)
473472
}
474473

475-
expectedCounter, ok := expectedUpstreams[resp.Response[0].Server]
474+
expectedCounter, ok := expectedUpstreams[resp.Server]
476475
if !ok {
477-
return fmt.Errorf("unexpected host: %v", resp.Response[0].Server)
476+
return fmt.Errorf("unexpected host: %v", resp.Server)
478477
}
479478

480-
if expectedCounter != resp.Response[0].Counter {
481-
return fmt.Errorf("unexpected counter value: %d != %d", expectedCounter, resp.Response[0].Counter)
479+
if expectedCounter != resp.Counter {
480+
return fmt.Errorf("unexpected counter value: %d != %d", expectedCounter, resp.Counter)
482481
}
483482

484-
expectedUpstreams[resp.Response[0].Server]++
483+
expectedUpstreams[resp.Server]++
485484
}
486485

487486
return nil

proxy/handler_one2one.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,6 @@ func (s *handler) forwardClientToServer(src *backendConnection, dst grpc.ServerS
6464
break
6565
}
6666

67-
var err error
68-
f.payload, err = src.backend.AppendInfo(f.payload)
69-
if err != nil {
70-
ret <- err
71-
break
72-
}
73-
7467
if i == 0 {
7568
// This is a bit of a hack, but client to server headers are only readable after first client msg is
7669
// received but must be written to server stream before the first msg is flushed.

0 commit comments

Comments
 (0)