Skip to content

Commit 57f1115

Browse files
authored
refactor(flow): use atomic.Int64 in Throttler (#169)
1 parent 386f6d8 commit 57f1115

File tree

2 files changed

+42
-21
lines changed

2 files changed

+42
-21
lines changed

flow/throttler.go

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,47 +8,64 @@ import (
88
"github.com/reugn/go-streams"
99
)
1010

11-
// ThrottleMode represents Throttler's processing behavior when its element
12-
// buffer overflows.
11+
// ThrottleMode defines the behavior of the Throttler when its internal buffer is full.
1312
type ThrottleMode int8
1413

1514
const (
16-
// Backpressure slows down upstream ingestion when the element buffer overflows.
15+
// Backpressure instructs the Throttler to block upstream ingestion when its internal
16+
// buffer is full. This effectively slows down the producer, preventing data loss
17+
// and ensuring all elements are eventually processed, albeit at a reduced rate. This
18+
// mode can cause upstream operations to block indefinitely if the downstream consumer
19+
// cannot keep up.
1720
Backpressure ThrottleMode = iota
18-
19-
// Discard drops incoming elements when the element buffer overflows.
21+
// Discard instructs the Throttler to drop incoming elements when its internal buffer
22+
// is full. This mode prioritizes maintaining the target throughput rate, even at the
23+
// cost of data loss. Elements are silently dropped without any indication to the
24+
// upstream producer. Use this mode when data loss is acceptable.
2025
Discard
2126
)
2227

2328
// Throttler limits the throughput to a specific number of elements per time unit.
2429
type Throttler struct {
25-
maxElements int64
2630
period time.Duration
2731
mode ThrottleMode
32+
maxElements int64
33+
counter atomic.Int64
34+
2835
in chan any
2936
out chan any
3037
quotaSignal chan struct{}
3138
done chan struct{}
32-
counter int64
3339
}
3440

3541
// Verify Throttler satisfies the Flow interface.
3642
var _ streams.Flow = (*Throttler)(nil)
3743

3844
// NewThrottler returns a new Throttler operator.
3945
//
46+
// The Throttler operator limits the rate at which elements are produced. It allows a
47+
// maximum of 'elements' number of elements to be processed within a specified 'period'
48+
// of time.
49+
//
4050
// elements is the maximum number of elements to be produced per the given period of time.
41-
// bufferSize specifies the buffer size for incoming elements.
42-
// mode specifies the processing behavior when the elements buffer overflows.
51+
// bufferSize is the size of the internal buffer for incoming elements. This buffer
52+
// temporarily holds elements waiting to be processed.
53+
// mode specifies the processing behavior when the internal elements buffer is full.
54+
// See [ThrottleMode] for available options.
4355
//
44-
// If elements or bufferSize are not positive, NewThrottler will panic.
56+
// If elements or bufferSize are not positive, or if mode is not a supported
57+
// ThrottleMode, NewThrottler will panic.
4558
func NewThrottler(elements int, period time.Duration, bufferSize int, mode ThrottleMode) *Throttler {
4659
if elements < 1 {
4760
panic(fmt.Sprintf("nonpositive elements number: %d", elements))
4861
}
4962
if bufferSize < 1 {
5063
panic(fmt.Sprintf("nonpositive buffer size: %d", bufferSize))
5164
}
65+
if mode != Discard && mode != Backpressure {
66+
panic(fmt.Sprintf("unsupported ThrottleMode: %d", mode))
67+
}
68+
5269
throttler := &Throttler{
5370
maxElements: int64(elements),
5471
period: period,
@@ -66,19 +83,19 @@ func NewThrottler(elements int, period time.Duration, bufferSize int, mode Throt
6683

6784
// quotaExceeded checks whether the quota per time unit has been exceeded.
6885
func (th *Throttler) quotaExceeded() bool {
69-
return atomic.LoadInt64(&th.counter) >= th.maxElements
86+
return th.counter.Load() >= th.maxElements
7087
}
7188

7289
// resetQuotaCounterLoop resets the throttler quota counter every th.period
73-
// and sends a release notification to the downstream processor.
90+
// and notifies the downstream processing goroutine of the quota reset.
7491
func (th *Throttler) resetQuotaCounterLoop() {
7592
ticker := time.NewTicker(th.period)
7693
defer ticker.Stop()
7794

7895
for {
7996
select {
8097
case <-ticker.C:
81-
atomic.StoreInt64(&th.counter, 0)
98+
th.counter.Store(0)
8299
th.notifyQuotaReset() // send quota reset
83100

84101
case <-th.done:
@@ -95,8 +112,8 @@ func (th *Throttler) notifyQuotaReset() {
95112
}
96113
}
97114

98-
// buffer starts buffering incoming elements.
99-
// If an unsupported ThrottleMode was specified, buffer will panic.
115+
// buffer buffers incoming elements from the in channel by sending them
116+
// to the out channel, adhering to the configured ThrottleMode.
100117
func (th *Throttler) buffer() {
101118
switch th.mode {
102119
case Discard:
@@ -110,8 +127,6 @@ func (th *Throttler) buffer() {
110127
for element := range th.in {
111128
th.out <- element
112129
}
113-
default:
114-
panic(fmt.Sprintf("Unsupported ThrottleMode: %d", th.mode))
115130
}
116131
close(th.out)
117132
}
@@ -139,15 +154,15 @@ func (th *Throttler) In() chan<- any {
139154
return th.in
140155
}
141156

142-
// streamPortioned streams elements to the next Inlet.
143-
// Subsequent processing of elements will be suspended when the quota limit is reached
144-
// until the next quota reset event.
157+
// streamPortioned streams elements to the given Inlet, enforcing a quota.
158+
// Elements are sent to inlet.In() until th.out is closed. If the quota is exceeded,
159+
// the function blocks until a quota reset signal is received on th.quotaSignal.
145160
func (th *Throttler) streamPortioned(inlet streams.Inlet) {
146161
for element := range th.out {
147162
if th.quotaExceeded() {
148163
<-th.quotaSignal // wait for quota reset
149164
}
150-
atomic.AddInt64(&th.counter, 1)
165+
th.counter.Add(1)
151166
inlet.In() <- element
152167
}
153168
close(th.done)

flow/throttler_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ func TestThrottler_NonPositiveBufferSize(t *testing.T) {
8989
})
9090
}
9191

92+
func TestThrottler_UnsupportedThrottleMode(t *testing.T) {
93+
assert.Panics(t, func() {
94+
flow.NewThrottler(2, time.Second, 1, flow.ThrottleMode(3))
95+
})
96+
}
97+
9298
func writeValues(in chan any) {
9399
inputValues := []string{"a", "b", "c", "d", "e", "f", "g"}
94100
ingestSlice(inputValues, in)

0 commit comments

Comments
 (0)