Skip to content

Commit edb8837

Browse files
committed
Merge branch 'kill-bootstrapClient' into simplify-clientHook-refcounts
2 parents 7732544 + ad4f48e commit edb8837

File tree

4 files changed

+60
-65
lines changed

4 files changed

+60
-65
lines changed

capability.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -853,8 +853,8 @@ type ClientHook interface {
853853

854854
// Shutdown releases any resources associated with this capability.
855855
// The behavior of calling any methods on the receiver after calling
856-
// Shutdown is undefined. It is expected for the ClientHook to reject
857-
// any outstanding call futures.
856+
// Shutdown is undefined. Any already-outstanding calls should not
857+
// be interrupted.
858858
Shutdown()
859859

860860
// String formats the hook as a string (same as fmt.Stringer)

rpc/level0_test.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -807,8 +807,35 @@ func TestSendBootstrapPipelineCall(t *testing.T) {
807807
}
808808
}
809809

810-
// 6. Release the client, read the finish.
811-
client.Release()
810+
// 6. Send back a return for the bootstrap message:
811+
bootstrapExportID := uint32(99)
812+
{
813+
outMsg, err := p2.NewMessage()
814+
require.NoError(t, err)
815+
iface := capnp.NewInterface(outMsg.Message().Segment(), 0)
816+
require.NoError(t, pogs.Insert(rpccp.Message_TypeID, capnp.Struct(outMsg.Message()),
817+
&rpcMessage{
818+
Which: rpccp.Message_Which_return,
819+
Return: &rpcReturn{
820+
AnswerID: bootstrapQID,
821+
Which: rpccp.Return_Which_results,
822+
Results: &rpcPayload{
823+
Content: iface.ToPtr(),
824+
CapTable: []rpcCapDescriptor{
825+
{
826+
Which: rpccp.CapDescriptor_Which_senderHosted,
827+
SenderHosted: bootstrapExportID,
828+
},
829+
},
830+
},
831+
},
832+
},
833+
))
834+
require.NoError(t, outMsg.Send())
835+
outMsg.Release()
836+
}
837+
838+
// 7. Read the finish:
812839
{
813840
rmsg, release, err := recvMessage(ctx, p2)
814841
if err != nil {
@@ -821,9 +848,22 @@ func TestSendBootstrapPipelineCall(t *testing.T) {
821848
if rmsg.Finish.QuestionID != bootstrapQID {
822849
t.Errorf("Received finish for question %d; want %d", rmsg.Finish.QuestionID, bootstrapQID)
823850
}
824-
if !rmsg.Finish.ReleaseResultCaps {
825-
t.Error("Received finish that does not release bootstrap")
826-
}
851+
require.False(
852+
t,
853+
rmsg.Finish.ReleaseResultCaps,
854+
"Received finish that releases bootstrap (should receive separate releasemessage)",
855+
)
856+
}
857+
858+
// 8. Release the client, read the release message.
859+
client.Release()
860+
{
861+
rmsg, release, err := recvMessage(ctx, p2)
862+
require.NoError(t, err)
863+
defer release()
864+
require.Equal(t, rpccp.Message_Which_release, rmsg.Which)
865+
require.Equal(t, bootstrapExportID, rmsg.Release.ID)
866+
require.Equal(t, uint32(1), rmsg.Release.ReferenceCount)
827867
}
828868
}
829869

rpc/question.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ type question struct {
1515
c *Conn
1616
id questionID
1717

18-
bootstrapPromise capnp.Resolver[capnp.Client]
19-
2018
p *capnp.Promise
2119
release capnp.ReleaseFunc // written before resolving p
2220

@@ -127,12 +125,7 @@ func (q *question) handleCancel(ctx context.Context) {
127125
q.c.er.ReportError(rpcerr.Annotate(err, "send finish"))
128126
}
129127
close(q.finishMsgSend)
130-
131128
q.p.Reject(rejectErr)
132-
if q.bootstrapPromise != nil {
133-
q.bootstrapPromise.Reject(rejectErr)
134-
q.p.ReleaseClients()
135-
}
136129
})
137130
})
138131
}
@@ -278,14 +271,8 @@ func (q *question) mark(xform []capnp.PipelineOp) {
278271
}
279272

280273
func (q *question) Reject(err error) {
281-
if q != nil {
282-
if q.bootstrapPromise != nil {
283-
q.bootstrapPromise.Fulfill(capnp.ErrorClient(err))
284-
}
285-
286-
if q.p != nil {
287-
q.p.Reject(err)
288-
}
274+
if q != nil && q.p != nil {
275+
q.p.Reject(err)
289276
}
290277
}
291278

rpc/rpc.go

Lines changed: 11 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -308,12 +308,12 @@ func (c *Conn) Bootstrap(ctx context.Context) (bc capnp.Client) {
308308
}
309309
defer c.tasks.Done()
310310

311-
bootCtx, cancel := context.WithCancel(ctx)
312311
q := c.newQuestion(capnp.Method{})
313-
bc, q.bootstrapPromise = capnp.NewPromisedClient(bootstrapClient{
314-
c: q.p.Answer().Client().AddRef(),
315-
cancel: cancel,
316-
})
312+
bc = q.p.Answer().Client().AddRef()
313+
go func() {
314+
q.p.ReleaseClients()
315+
q.release()
316+
}()
317317

318318
c.sendMessage(ctx, func(m rpccp.Message) error {
319319
boot, err := m.NewBootstrap()
@@ -327,7 +327,7 @@ func (c *Conn) Bootstrap(ctx context.Context) (bc capnp.Client) {
327327
syncutil.With(&c.lk, func() {
328328
c.lk.questions[q.id] = nil
329329
})
330-
q.bootstrapPromise.Reject(exc.Annotate("rpc", "bootstrap", err))
330+
q.p.Reject(exc.Annotate("rpc", "bootstrap", err))
331331
syncutil.With(&c.lk, func() {
332332
c.lk.questionID.remove(uint32(q.id))
333333
})
@@ -337,40 +337,14 @@ func (c *Conn) Bootstrap(ctx context.Context) (bc capnp.Client) {
337337
c.tasks.Add(1)
338338
go func() {
339339
defer c.tasks.Done()
340-
q.handleCancel(bootCtx)
340+
q.handleCancel(ctx)
341341
}()
342342
})
343343

344344
return
345345
})
346346
}
347347

348-
type bootstrapClient struct {
349-
c capnp.Client
350-
cancel context.CancelFunc
351-
}
352-
353-
func (bc bootstrapClient) String() string {
354-
return "bootstrapClient{c: " + bc.c.String() + "}"
355-
}
356-
357-
func (bc bootstrapClient) Send(ctx context.Context, s capnp.Send) (*capnp.Answer, capnp.ReleaseFunc) {
358-
return bc.c.SendCall(ctx, s)
359-
}
360-
361-
func (bc bootstrapClient) Recv(ctx context.Context, r capnp.Recv) capnp.PipelineCaller {
362-
return bc.c.RecvCall(ctx, r)
363-
}
364-
365-
func (bc bootstrapClient) Brand() capnp.Brand {
366-
return bc.c.State().Brand
367-
}
368-
369-
func (bc bootstrapClient) Shutdown() {
370-
bc.cancel()
371-
bc.c.Release()
372-
}
373-
374348
// Close sends an abort to the remote vat and closes the underlying
375349
// transport.
376350
func (c *Conn) Close() error {
@@ -1151,23 +1125,17 @@ func (c *Conn) handleReturn(ctx context.Context, in transport.IncomingMessage) e
11511125
c.er.ReportError(rpcerr.Annotate(pr.err, "incoming return"))
11521126
}
11531127

1154-
if q.bootstrapPromise == nil && pr.err == nil {
1155-
// The result of the message contains actual data (not just a
1156-
// client or an error), so we save the ReleaseFunc for later:
1128+
if pr.err == nil {
1129+
// The result of the message contains actual data (not just
1130+
// an error), so we save the ReleaseFunc for later:
11571131
q.release = in.Release
11581132
}
11591133
// We're going to potentially block fulfilling some promises so fork
11601134
// off a goroutine to avoid blocking the receive loop.
11611135
go func() {
11621136
c := unlockedConn
11631137
q.p.Resolve(pr.result, pr.err)
1164-
if q.bootstrapPromise != nil {
1165-
q.bootstrapPromise.Fulfill(q.p.Answer().Client())
1166-
q.p.ReleaseClients()
1167-
// We can release now; root pointer of the result is a client, so the
1168-
// message won't be accessed:
1169-
in.Release()
1170-
} else if pr.err != nil {
1138+
if pr.err != nil {
11711139
// We can release now; the result is an error, so data from the message
11721140
// won't be accessed:
11731141
in.Release()

0 commit comments

Comments
 (0)