Skip to content

Commit 4407051

Browse files
committed
Merge remote-tracking branch 'origin/main' into leak-stacks
2 parents 83768a6 + 70a38c2 commit 4407051

12 files changed

+468
-439
lines changed

answer.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -156,28 +156,29 @@ func (p *Promise) Resolve(r Ptr, e error) {
156156
}
157157
p.caller = nil
158158

159-
if len(p.clients) > 0 || p.ongoingCalls > 0 {
160-
// Pending resolution state: wait for clients to be fulfilled
161-
// and calls to have answers. p.clients cannot be touched in the
162-
// pending resolution state, so we have exclusive access to the
163-
// variable.
164-
if p.ongoingCalls > 0 {
165-
p.callsStopped = make(chan struct{})
166-
}
167-
syncutil.Without(&p.mu, func() {
168-
res := resolution{p.method, r, e}
169-
for path, cp := range p.clients {
170-
t := path.transform()
171-
cp.promise.fulfill(res.client(t))
172-
shutdownPromises = append(shutdownPromises, cp.promise)
173-
cp.promise = nil
174-
}
175-
if p.callsStopped != nil {
176-
<-p.callsStopped
177-
}
178-
})
159+
if p.ongoingCalls > 0 {
160+
p.callsStopped = make(chan struct{})
179161
}
162+
})
180163

164+
if len(p.clients) > 0 || p.ongoingCalls > 0 {
165+
// Pending resolution state: wait for clients to be fulfilled
166+
// and calls to have answers. p.clients cannot be touched in the
167+
// pending resolution state, so we have exclusive access to the
168+
// variable.
169+
res := resolution{p.method, r, e}
170+
for path, cp := range p.clients {
171+
t := path.transform()
172+
cp.promise.fulfill(res.client(t))
173+
shutdownPromises = append(shutdownPromises, cp.promise)
174+
cp.promise = nil
175+
}
176+
if p.callsStopped != nil {
177+
<-p.callsStopped
178+
}
179+
}
180+
181+
syncutil.With(&p.mu, func() {
181182
// Move p into resolved state.
182183
p.callsStopped = nil
183184
p.result, p.err = r, e

rpc/answer.go

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ type answer struct {
3030
// entry is a placeholder until the remote vat cancels the call.
3131
ret rpccp.Return
3232

33-
// sendMsg sends the return message. The caller MUST NOT hold ans.c.mu.
33+
// sendMsg sends the return message. The caller MUST NOT hold ans.c.lk.
3434
sendMsg func()
3535

3636
// msgReleaser releases the return message when its refcount hits zero.
37-
// The caller MUST NOT hold ans.c.mu.
37+
// The caller MUST NOT hold ans.c.lk.
3838
msgReleaser *rc.Releaser
3939

4040
// results is the memoized answer to ret.Results().
@@ -99,27 +99,27 @@ func errorAnswer(c *Conn, id answerID, err error) *answer {
9999
// all references to it are dropped; the caller is responsible for one reference. This will not
100100
// happen before the message is sent, as the returned send function retains a reference.
101101
func (c *Conn) newReturn(ctx context.Context) (_ rpccp.Return, sendMsg func(), _ *rc.Releaser, _ error) {
102-
msg, send, releaseMsg, err := c.transport.NewMessage()
102+
outMsg, err := c.transport.NewMessage()
103103
if err != nil {
104104
return rpccp.Return{}, nil, nil, rpcerr.Failedf("create return: %w", err)
105105
}
106-
ret, err := msg.NewReturn()
106+
ret, err := outMsg.Message.NewReturn()
107107
if err != nil {
108-
releaseMsg()
108+
outMsg.Release()
109109
return rpccp.Return{}, nil, nil, rpcerr.Failedf("create return: %w", err)
110110
}
111111

112112
// Before releasing the message, we need to wait both until it is sent and
113113
// until the local vat is done with it. We therefore implement a simple
114114
// ref-counting mechanism such that 'release' must be called twice before
115115
// 'releaseMsg' is called.
116-
releaser := rc.NewReleaser(2, releaseMsg)
116+
releaser := rc.NewReleaser(2, outMsg.Release)
117117

118118
return ret, func() {
119119
c.sender.Send(asyncSend{
120-
send: send,
120+
send: outMsg.Send,
121121
release: releaser.Decr,
122-
callback: func(err error) {
122+
onSent: func(err error) {
123123
if err != nil {
124124
c.er.ReportError(fmt.Errorf("send return: %w", err))
125125
}
@@ -129,11 +129,11 @@ func (c *Conn) newReturn(ctx context.Context) (_ rpccp.Return, sendMsg func(), _
129129
}
130130

131131
// setPipelineCaller sets ans.pcall to pcall if the answer has not
132-
// already returned. The caller MUST NOT hold ans.c.mu.
132+
// already returned. The caller MUST NOT hold ans.c.lk.
133133
//
134134
// This also sets ans.promise to a new promise, wrapping pcall.
135135
func (ans *answer) setPipelineCaller(m capnp.Method, pcall capnp.PipelineCaller) {
136-
syncutil.With(&ans.c.mu, func() {
136+
syncutil.With(&ans.c.lk, func() {
137137
if !ans.flags.Contains(resultsReady) {
138138
ans.pcall = pcall
139139
ans.promise = capnp.NewPromise(m, pcall)
@@ -181,12 +181,12 @@ func (ans *answer) setBootstrap(c capnp.Client) error {
181181

182182
// Return sends the return message.
183183
//
184-
// The caller MUST NOT hold ans.c.mu.
184+
// The caller MUST NOT hold ans.c.lk.
185185
func (ans *answer) Return(e error) {
186-
ans.c.mu.Lock()
186+
ans.c.lk.Lock()
187187
if e != nil {
188188
rl := ans.sendException(e)
189-
ans.c.mu.Unlock()
189+
ans.c.lk.Unlock()
190190
rl.release()
191191
ans.pcalls.Wait()
192192
ans.c.tasks.Done() // added by handleCall
@@ -202,13 +202,13 @@ func (ans *answer) Return(e error) {
202202
ans.c.er.ReportError(err)
203203
}
204204

205-
ans.c.mu.Unlock()
205+
ans.c.lk.Unlock()
206206
rl.release()
207207
ans.pcalls.Wait()
208208
return
209209
}
210210
}
211-
ans.c.mu.Unlock()
211+
ans.c.lk.Unlock()
212212
rl.release()
213213
ans.pcalls.Wait()
214214
ans.c.tasks.Done() // added by handleCall
@@ -219,7 +219,7 @@ func (ans *answer) Return(e error) {
219219
// Finish with releaseResultCaps set to true, then sendReturn returns
220220
// the number of references to be subtracted from each export.
221221
//
222-
// The caller MUST be holding onto ans.c.mu. sendReturn MUST NOT be
222+
// The caller MUST be holding onto ans.c.lk. sendReturn MUST NOT be
223223
// called if sendException was previously called.
224224
func (ans *answer) sendReturn() (releaseList, error) {
225225
ans.pcall = nil
@@ -252,13 +252,13 @@ func (ans *answer) sendReturn() (releaseList, error) {
252252
}
253253
ans.promise = nil
254254
}
255-
ans.c.mu.Unlock()
255+
ans.c.lk.Unlock()
256256
ans.sendMsg()
257257
if fin {
258-
ans.c.mu.Lock()
258+
ans.c.lk.Lock()
259259
return ans.destroy()
260260
}
261-
ans.c.mu.Lock()
261+
ans.c.lk.Lock()
262262
}
263263
ans.flags |= returnSent
264264
if !ans.flags.Contains(finishReceived) {
@@ -269,7 +269,7 @@ func (ans *answer) sendReturn() (releaseList, error) {
269269

270270
// sendException sends an exception on the answer's return message.
271271
//
272-
// The caller MUST be holding onto ans.c.mu. sendException MUST NOT
272+
// The caller MUST be holding onto ans.c.lk. sendException MUST NOT
273273
// be called if sendReturn was previously called.
274274
func (ans *answer) sendException(ex error) releaseList {
275275
ans.err = ex
@@ -286,7 +286,7 @@ func (ans *answer) sendException(ex error) releaseList {
286286
default:
287287
// Send exception.
288288
fin := ans.flags.Contains(finishReceived)
289-
ans.c.mu.Unlock()
289+
ans.c.lk.Unlock()
290290
if e, err := ans.ret.NewException(); err != nil {
291291
ans.c.er.ReportError(fmt.Errorf("send exception: %w", err))
292292
} else {
@@ -298,11 +298,11 @@ func (ans *answer) sendException(ex error) releaseList {
298298
}
299299
}
300300
if fin {
301-
ans.c.mu.Lock()
301+
ans.c.lk.Lock()
302302
rl, _ := ans.destroy()
303303
return rl
304304
}
305-
ans.c.mu.Lock()
305+
ans.c.lk.Lock()
306306
}
307307
ans.flags |= returnSent
308308
if !ans.flags.Contains(finishReceived) {
@@ -314,17 +314,19 @@ func (ans *answer) sendException(ex error) releaseList {
314314
return rl
315315
}
316316

317-
// destroy removes the answer from the table and returns the clients to
318-
// release. The answer must have sent a return and received a finish.
319-
// The caller must be holding onto ans.c.mu.
317+
// destroy removes the answer from the table and returns ReleaseFuncs to
318+
// run. The answer must have sent a return and received a finish.
319+
// The caller must be holding onto ans.c.lk.
320320
//
321321
// shutdown has its own strategy for cleaning up an answer.
322322
func (ans *answer) destroy() (releaseList, error) {
323-
defer syncutil.Without(&ans.c.mu, ans.msgReleaser.Decr)
324-
delete(ans.c.answers, ans.id)
323+
rl := releaseList{ans.msgReleaser.Decr}
324+
delete(ans.c.lk.answers, ans.id)
325325
if !ans.flags.Contains(releaseResultCapsFlag) || len(ans.exportRefs) == 0 {
326-
return nil, nil
326+
return rl, nil
327327

328328
}
329-
return ans.c.releaseExportRefs(ans.exportRefs)
329+
ret, err := ans.c.releaseExportRefs(ans.exportRefs)
330+
ret = append(ret, rl...)
331+
return ret, err
330332
}

rpc/export.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ func (c *Conn) clearExportID(m *capnp.Metadata) {
4141
}
4242

4343
// findExport returns the export entry with the given ID or nil if
44-
// couldn't be found.
44+
// couldn't be found. The caller must be holding c.mu
4545
func (c *Conn) findExport(id exportID) *expent {
46-
if int64(id) >= int64(len(c.exports)) {
46+
if int64(id) >= int64(len(c.lk.exports)) {
4747
return nil
4848
}
49-
return c.exports[id] // might be nil
49+
return c.lk.exports[id] // might be nil
5050
}
5151

5252
// releaseExport decreases the number of wire references to an export
@@ -63,8 +63,8 @@ func (c *Conn) releaseExport(id exportID, count uint32) (capnp.Client, error) {
6363
switch {
6464
case count == ent.wireRefs:
6565
client := ent.client
66-
c.exports[id] = nil
67-
c.exportID.remove(uint32(id))
66+
c.lk.exports[id] = nil
67+
c.lk.exportID.remove(uint32(id))
6868
metadata := client.State().Metadata
6969
syncutil.With(metadata, func() {
7070
c.clearExportID(metadata)
@@ -98,7 +98,7 @@ func (c *Conn) releaseExportRefs(refs map[exportID]uint32) (releaseList, error)
9898
if rl == nil {
9999
rl = make(releaseList, 0, n)
100100
}
101-
rl = append(rl, client)
101+
rl = append(rl, client.Release)
102102
n--
103103
}
104104
return rl, firstErr
@@ -116,7 +116,7 @@ func (c *Conn) sendCap(d rpccp.CapDescriptor, client capnp.Client) (_ exportID,
116116
state := client.State()
117117
bv := state.Brand.Value
118118
if ic, ok := bv.(*importClient); ok && ic.c == c {
119-
if ent := c.imports[ic.id]; ent != nil && ent.generation == ic.generation {
119+
if ent := c.lk.imports[ic.id]; ent != nil && ent.generation == ic.generation {
120120
d.SetReceiverHosted(uint32(ic.id))
121121
return 0, false, nil
122122
}
@@ -149,7 +149,7 @@ func (c *Conn) sendCap(d rpccp.CapDescriptor, client capnp.Client) (_ exportID,
149149
defer state.Metadata.Unlock()
150150
id, ok := c.findExportID(state.Metadata)
151151
if ok {
152-
ent := c.exports[id]
152+
ent := c.lk.exports[id]
153153
ent.wireRefs++
154154
d.SetSenderHosted(uint32(id))
155155
return id, true, nil
@@ -160,11 +160,11 @@ func (c *Conn) sendCap(d rpccp.CapDescriptor, client capnp.Client) (_ exportID,
160160
client: client.AddRef(),
161161
wireRefs: 1,
162162
}
163-
id = exportID(c.exportID.next())
164-
if int64(id) == int64(len(c.exports)) {
165-
c.exports = append(c.exports, ee)
163+
id = exportID(c.lk.exportID.next())
164+
if int64(id) == int64(len(c.lk.exports)) {
165+
c.lk.exports = append(c.lk.exports, ee)
166166
} else {
167-
c.exports[id] = ee
167+
c.lk.exports[id] = ee
168168
}
169169
c.setExportID(state.Metadata, id)
170170
d.SetSenderHosted(uint32(id))
@@ -217,28 +217,28 @@ type embargo struct {
217217
//
218218
// The caller must be holding onto c.mu.
219219
func (c *Conn) embargo(client capnp.Client) (embargoID, capnp.Client) {
220-
id := embargoID(c.embargoID.next())
220+
id := embargoID(c.lk.embargoID.next())
221221
e := &embargo{
222222
c: client,
223223
lifted: make(chan struct{}),
224224
}
225-
if int64(id) == int64(len(c.embargoes)) {
226-
c.embargoes = append(c.embargoes, e)
225+
if int64(id) == int64(len(c.lk.embargoes)) {
226+
c.lk.embargoes = append(c.lk.embargoes, e)
227227
} else {
228-
c.embargoes[id] = e
228+
c.lk.embargoes[id] = e
229229
}
230230
var c2 capnp.Client
231-
c2, c.embargoes[id].p = capnp.NewPromisedClient(c.embargoes[id])
231+
c2, c.lk.embargoes[id].p = capnp.NewPromisedClient(c.lk.embargoes[id])
232232
return id, c2
233233
}
234234

235235
// findEmbargo returns the embargo entry with the given ID or nil if
236-
// couldn't be found.
236+
// couldn't be found. Must be holding c.mu
237237
func (c *Conn) findEmbargo(id embargoID) *embargo {
238-
if int64(id) >= int64(len(c.embargoes)) {
238+
if int64(id) >= int64(len(c.lk.embargoes)) {
239239
return nil
240240
}
241-
return c.embargoes[id] // might be nil
241+
return c.lk.embargoes[id] // might be nil
242242
}
243243

244244
// lift disembargoes the client. It must be called only once.
@@ -308,13 +308,13 @@ func (sl *senderLoopback) buildDisembargo(msg rpccp.Message) error {
308308
return nil
309309
}
310310

311-
type releaseList []capnp.Client
311+
type releaseList []capnp.ReleaseFunc
312312

313313
func (rl releaseList) release() {
314-
for _, c := range rl {
315-
c.Release()
314+
for _, r := range rl {
315+
r()
316316
}
317317
for i := range rl {
318-
rl[i] = capnp.Client{}
318+
rl[i] = func() {}
319319
}
320320
}

0 commit comments

Comments
 (0)