diff --git a/capability.go b/capability.go index 278a2d0a..f372f1fb 100644 --- a/capability.go +++ b/capability.go @@ -854,8 +854,8 @@ type ClientHook interface { // Shutdown releases any resources associated with this capability. // The behavior of calling any methods on the receiver after calling - // Shutdown is undefined. It is expected for the ClientHook to reject - // any outstanding call futures. + // Shutdown is undefined. Shutdown must not interrupt any already + // outstanding calls. Shutdown() // String formats the hook as a string (same as fmt.Stringer) diff --git a/rpc/level0_test.go b/rpc/level0_test.go index eb0340f8..a103274a 100644 --- a/rpc/level0_test.go +++ b/rpc/level0_test.go @@ -807,8 +807,35 @@ func TestSendBootstrapPipelineCall(t *testing.T) { } } - // 6. Release the client, read the finish. - client.Release() + // 6. Send back a return for the bootstrap message: + bootstrapExportID := uint32(99) + { + outMsg, err := p2.NewMessage() + require.NoError(t, err) + iface := capnp.NewInterface(outMsg.Message().Segment(), 0) + require.NoError(t, pogs.Insert(rpccp.Message_TypeID, capnp.Struct(outMsg.Message()), + &rpcMessage{ + Which: rpccp.Message_Which_return, + Return: &rpcReturn{ + AnswerID: bootstrapQID, + Which: rpccp.Return_Which_results, + Results: &rpcPayload{ + Content: iface.ToPtr(), + CapTable: []rpcCapDescriptor{ + { + Which: rpccp.CapDescriptor_Which_senderHosted, + SenderHosted: bootstrapExportID, + }, + }, + }, + }, + }, + )) + require.NoError(t, outMsg.Send()) + outMsg.Release() + } + + // 7. Read the finish: { rmsg, release, err := recvMessage(ctx, p2) if err != nil { @@ -821,9 +848,22 @@ func TestSendBootstrapPipelineCall(t *testing.T) { if rmsg.Finish.QuestionID != bootstrapQID { t.Errorf("Received finish for question %d; want %d", rmsg.Finish.QuestionID, bootstrapQID) } - if !rmsg.Finish.ReleaseResultCaps { - t.Error("Received finish that does not release bootstrap") - } + require.False( + t, + rmsg.Finish.ReleaseResultCaps, + "Received finish that releases bootstrap (should receive separate releasemessage)", + ) + } + + // 8. Release the client, read the release message. + client.Release() + { + rmsg, release, err := recvMessage(ctx, p2) + require.NoError(t, err) + defer release() + require.Equal(t, rpccp.Message_Which_release, rmsg.Which) + require.Equal(t, bootstrapExportID, rmsg.Release.ID) + require.Equal(t, uint32(1), rmsg.Release.ReferenceCount) } } diff --git a/rpc/question.go b/rpc/question.go index 4a079a18..458ba229 100644 --- a/rpc/question.go +++ b/rpc/question.go @@ -15,8 +15,6 @@ type question struct { c *Conn id questionID - bootstrapPromise capnp.Resolver[capnp.Client] - p *capnp.Promise release capnp.ReleaseFunc // written before resolving p @@ -127,12 +125,7 @@ func (q *question) handleCancel(ctx context.Context) { q.c.er.ReportError(rpcerr.Annotate(err, "send finish")) } close(q.finishMsgSend) - q.p.Reject(rejectErr) - if q.bootstrapPromise != nil { - q.bootstrapPromise.Reject(rejectErr) - q.p.ReleaseClients() - } }) }) } @@ -278,14 +271,8 @@ func (q *question) mark(xform []capnp.PipelineOp) { } func (q *question) Reject(err error) { - if q != nil { - if q.bootstrapPromise != nil { - q.bootstrapPromise.Fulfill(capnp.ErrorClient(err)) - } - - if q.p != nil { - q.p.Reject(err) - } + if q != nil && q.p != nil { + q.p.Reject(err) } } diff --git a/rpc/rpc.go b/rpc/rpc.go index f624c44d..9f6936bf 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -308,12 +308,12 @@ func (c *Conn) Bootstrap(ctx context.Context) (bc capnp.Client) { } defer c.tasks.Done() - bootCtx, cancel := context.WithCancel(ctx) q := c.newQuestion(capnp.Method{}) - bc, q.bootstrapPromise = capnp.NewPromisedClient(bootstrapClient{ - c: q.p.Answer().Client().AddRef(), - cancel: cancel, - }) + bc = q.p.Answer().Client().AddRef() + go func() { + q.p.ReleaseClients() + q.release() + }() c.sendMessage(ctx, func(m rpccp.Message) error { boot, err := m.NewBootstrap() @@ -327,7 +327,7 @@ func (c *Conn) Bootstrap(ctx context.Context) (bc capnp.Client) { syncutil.With(&c.lk, func() { c.lk.questions[q.id] = nil }) - q.bootstrapPromise.Reject(exc.Annotate("rpc", "bootstrap", err)) + q.p.Reject(exc.Annotate("rpc", "bootstrap", err)) syncutil.With(&c.lk, func() { c.lk.questionID.remove(uint32(q.id)) }) @@ -337,7 +337,7 @@ func (c *Conn) Bootstrap(ctx context.Context) (bc capnp.Client) { c.tasks.Add(1) go func() { defer c.tasks.Done() - q.handleCancel(bootCtx) + q.handleCancel(ctx) }() }) @@ -345,32 +345,6 @@ func (c *Conn) Bootstrap(ctx context.Context) (bc capnp.Client) { }) } -type bootstrapClient struct { - c capnp.Client - cancel context.CancelFunc -} - -func (bc bootstrapClient) String() string { - return "bootstrapClient{c: " + bc.c.String() + "}" -} - -func (bc bootstrapClient) Send(ctx context.Context, s capnp.Send) (*capnp.Answer, capnp.ReleaseFunc) { - return bc.c.SendCall(ctx, s) -} - -func (bc bootstrapClient) Recv(ctx context.Context, r capnp.Recv) capnp.PipelineCaller { - return bc.c.RecvCall(ctx, r) -} - -func (bc bootstrapClient) Brand() capnp.Brand { - return bc.c.State().Brand -} - -func (bc bootstrapClient) Shutdown() { - bc.cancel() - bc.c.Release() -} - // Close sends an abort to the remote vat and closes the underlying // transport. func (c *Conn) Close() error { @@ -1151,9 +1125,9 @@ func (c *Conn) handleReturn(ctx context.Context, in transport.IncomingMessage) e c.er.ReportError(rpcerr.Annotate(pr.err, "incoming return")) } - if q.bootstrapPromise == nil && pr.err == nil { - // The result of the message contains actual data (not just a - // client or an error), so we save the ReleaseFunc for later: + if pr.err == nil { + // The result of the message contains actual data (not just + // an error), so we save the ReleaseFunc for later: q.release = in.Release } // We're going to potentially block fulfilling some promises so fork @@ -1161,13 +1135,7 @@ func (c *Conn) handleReturn(ctx context.Context, in transport.IncomingMessage) e go func() { c := unlockedConn q.p.Resolve(pr.result, pr.err) - if q.bootstrapPromise != nil { - q.bootstrapPromise.Fulfill(q.p.Answer().Client()) - q.p.ReleaseClients() - // We can release now; root pointer of the result is a client, so the - // message won't be accessed: - in.Release() - } else if pr.err != nil { + if pr.err != nil { // We can release now; the result is an error, so data from the message // won't be accessed: in.Release()