Skip to content

Commit 34de47d

Browse files
authored
Merge pull request #396 from zenhack/handleDisembargo-locking
Clean up/fix locking in handleDisembargo
2 parents 73ff1cb + 182cf37 commit 34de47d

File tree

1 file changed

+36
-40
lines changed

1 file changed

+36
-40
lines changed

rpc/rpc.go

Lines changed: 36 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1389,64 +1389,60 @@ func (c *Conn) handleDisembargo(ctx context.Context, d rpccp.Disembargo, release
13891389
return
13901390
}
13911391

1392-
client := iface.Client()
1393-
1394-
var ok bool
1395-
syncutil.Without(&c.lk, func() {
1396-
imp, ok = client.State().Brand.Value.(*importClient)
1397-
})
1398-
1399-
if !ok || imp.c != c {
1400-
client.Release()
1401-
err = rpcerr.Failedf("incoming disembargo: sender loopback requested on a capability that is not an import")
1402-
return
1403-
}
1404-
1405-
// TODO(maybe): check generation?
1392+
client = iface.Client()
14061393
})
14071394

14081395
if err != nil {
14091396
release()
14101397
return err
14111398
}
14121399

1400+
imp, ok := client.State().Brand.Value.(*importClient)
1401+
if !ok || imp.c != c {
1402+
client.Release()
1403+
return rpcerr.Failedf("incoming disembargo: sender loopback requested on a capability that is not an import")
1404+
}
1405+
// TODO(maybe): check generation?
1406+
14131407
// Since this Cap'n Proto RPC implementation does not send imports
14141408
// unless they are fully dequeued, we can just immediately loop back.
14151409
id := d.Context().SenderLoopback()
1416-
c.sendMessage(ctx, func(m rpccp.Message) error {
1417-
defer release()
1418-
defer client.Release()
1419-
1420-
d, err := m.NewDisembargo()
1421-
if err != nil {
1422-
return err
1423-
}
1410+
syncutil.With(&c.lk, func() {
1411+
c.sendMessage(ctx, func(m rpccp.Message) error {
1412+
d, err := m.NewDisembargo()
1413+
if err != nil {
1414+
return err
1415+
}
14241416

1425-
tgt, err := d.NewTarget()
1426-
if err != nil {
1427-
return err
1428-
}
1417+
tgt, err := d.NewTarget()
1418+
if err != nil {
1419+
return err
1420+
}
14291421

1430-
tgt.SetImportedCap(uint32(imp.id))
1431-
d.Context().SetReceiverLoopback(id)
1432-
return nil
1422+
tgt.SetImportedCap(uint32(imp.id))
1423+
d.Context().SetReceiverLoopback(id)
1424+
return nil
14331425

1434-
}, func(err error) {
1435-
c.er.ReportError(rpcerr.Annotatef(err, "incoming disembargo: send receiver loopback"))
1426+
}, func(err error) {
1427+
defer release()
1428+
defer client.Release()
1429+
c.er.ReportError(rpcerr.Annotatef(err, "incoming disembargo: send receiver loopback"))
1430+
})
14361431
})
14371432

14381433
default:
14391434
c.er.ReportError(fmt.Errorf("incoming disembargo: context %v not implemented", d.Context().Which()))
1440-
c.sendMessage(ctx, func(m rpccp.Message) (err error) {
1441-
defer release()
1442-
1443-
if m, err = m.NewUnimplemented(); err == nil {
1444-
err = m.SetDisembargo(d)
1445-
}
1435+
syncutil.With(&c.lk, func() {
1436+
c.sendMessage(ctx, func(m rpccp.Message) (err error) {
1437+
if m, err = m.NewUnimplemented(); err == nil {
1438+
err = m.SetDisembargo(d)
1439+
}
14461440

1447-
return
1448-
}, func(err error) {
1449-
c.er.ReportError(rpcerr.Annotate(err, "incoming disembargo: send unimplemented"))
1441+
return
1442+
}, func(err error) {
1443+
defer release()
1444+
c.er.ReportError(rpcerr.Annotate(err, "incoming disembargo: send unimplemented"))
1445+
})
14501446
})
14511447
}
14521448

0 commit comments

Comments
 (0)