Skip to content

Commit 88d3b1e

Browse files
committed
Merge remote-tracking branch 'origin/main' into rc-pkg
2 parents 7fbf9c3 + c434d9f commit 88d3b1e

File tree

1 file changed

+7
-51
lines changed

1 file changed

+7
-51
lines changed

answer.go

Lines changed: 7 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,6 @@ type promiseState struct {
5353
// the promise leaves the unresolved state.
5454
caller PipelineCaller
5555

56-
// ongoingCalls counts the number of calls to caller that have not
57-
// yielded an Answer yet (but not necessarily finished).
58-
ongoingCalls int
59-
// If callsStopped is non-nil, then the promise has entered into
60-
// the pending state and is waiting for ongoingCalls to drop to zero.
61-
// After decrementing ongoingCalls, callsStopped should be closed if
62-
// ongoingCalls is zero to wake up the goroutine.
63-
//
64-
// Only Fulfill or Reject will set callsStopped.
65-
callsStopped chan struct{}
66-
6756
// clients is a table of promised clients created to proxy the eventual
6857
// result's clients. Even after resolution, this table may still have
6958
// entries until the clients are released. Cannot be read or written
@@ -161,30 +150,17 @@ func (p *Promise) Resolve(r Ptr, e error) {
161150
dq := &deferred.Queue{}
162151
defer dq.Run()
163152

164-
var (
165-
// We need to access some of these fields from p.state while
166-
// not holding the lock, so we store them here while holding it.
167-
// p.clients cannot be touched in the pending resolution state,
168-
// so we have exclusive access to the variable anyway.
169-
clients map[clientPath]*clientAndPromise
170-
callsStopped chan struct{}
171-
)
172-
173-
p.state.With(func(p *promiseState) {
153+
// It's ok to extract p.clients and use it while not holding the lock:
154+
// it may not be accessed in the pending resolution state, so we have
155+
// exclusive access to the variable anyway.
156+
clients := mutex.With1(&p.state, func(p *promiseState) map[clientPath]*clientAndPromise {
174157
if e != nil {
175158
p.requireUnresolved("Reject")
176159
} else {
177160
p.requireUnresolved("Fulfill")
178161
}
179162
p.caller = nil
180-
181-
if p.ongoingCalls > 0 {
182-
p.callsStopped = make(chan struct{})
183-
}
184-
185-
if len(p.clients) > 0 || p.ongoingCalls > 0 {
186-
clients = p.clients
187-
}
163+
return p.clients
188164
})
189165

190166
// Pending resolution state: wait for clients to be fulfilled
@@ -195,13 +171,9 @@ func (p *Promise) Resolve(r Ptr, e error) {
195171
cp.promise.fulfill(dq, res.client(t))
196172
cp.promise = nil
197173
}
198-
if callsStopped != nil {
199-
<-callsStopped
200-
}
201174

202175
p.state.With(func(p *promiseState) {
203176
// Move p into resolved state.
204-
p.callsStopped = nil
205177
p.result, p.err = r, e
206178
for _, f := range p.signals {
207179
f()
@@ -351,17 +323,9 @@ func (ans *Answer) PipelineSend(ctx context.Context, transform []PipelineOp, s S
351323
l := p.state.Lock()
352324
switch {
353325
case l.Value().isUnresolved():
354-
l.Value().ongoingCalls++
355326
caller := l.Value().caller
356327
l.Unlock()
357-
ans, release := caller.PipelineSend(ctx, transform, s)
358-
p.state.With(func(p *promiseState) {
359-
p.ongoingCalls--
360-
if p.ongoingCalls == 0 && p.callsStopped != nil {
361-
close(p.callsStopped)
362-
}
363-
})
364-
return ans, release
328+
return caller.PipelineSend(ctx, transform, s)
365329
case l.Value().isPendingResolution():
366330
// Block new calls until resolved.
367331
l.Unlock()
@@ -387,17 +351,9 @@ func (ans *Answer) PipelineRecv(ctx context.Context, transform []PipelineOp, r R
387351
l := p.state.Lock()
388352
switch {
389353
case l.Value().isUnresolved():
390-
l.Value().ongoingCalls++
391354
caller := l.Value().caller
392355
l.Unlock()
393-
pcall := caller.PipelineRecv(ctx, transform, r)
394-
p.state.With(func(p *promiseState) {
395-
p.ongoingCalls--
396-
if p.ongoingCalls == 0 && p.callsStopped != nil {
397-
close(p.callsStopped)
398-
}
399-
})
400-
return pcall
356+
return caller.PipelineRecv(ctx, transform, r)
401357
case l.Value().isPendingResolution():
402358
// Block new calls until resolved.
403359
l.Unlock()

0 commit comments

Comments
 (0)