Skip to content

Commit 228803c

Browse files
authored
p2p/enode: add support for naming iterator sources (#31779)
This adds support for naming the source iterators of FairMix, like so: mix.AddSource(enode.WithSourceName("mySource", iter)) The source that produced the latest node is returned by the new NodeSource method.
1 parent 52dbd20 commit 228803c

File tree

2 files changed

+109
-18
lines changed

2 files changed

+109
-18
lines changed

p2p/enode/iter.go

+61-18
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,35 @@ type Iterator interface {
3030
Close() // ends the iterator
3131
}
3232

33+
// SourceIterator represents a sequence of nodes like [Iterator]
34+
// Each node also has a named 'source'.
35+
type SourceIterator interface {
36+
Iterator
37+
NodeSource() string // source of current node
38+
}
39+
40+
// WithSource attaches a 'source name' to an iterator.
41+
func WithSourceName(name string, it Iterator) SourceIterator {
42+
return sourceIter{it, name}
43+
}
44+
45+
func ensureSourceIter(it Iterator) SourceIterator {
46+
if si, ok := it.(SourceIterator); ok {
47+
return si
48+
}
49+
return WithSourceName("", it)
50+
}
51+
52+
type sourceIter struct {
53+
Iterator
54+
name string
55+
}
56+
57+
// NodeSource implements IteratorSource.
58+
func (it sourceIter) NodeSource() string {
59+
return it.name
60+
}
61+
3362
// ReadNodes reads at most n nodes from the given iterator. The return value contains no
3463
// duplicates and no nil values. To prevent looping indefinitely for small repeating node
3564
// sequences, this function calls Next at most n times.
@@ -106,16 +135,16 @@ func (it *sliceIter) Close() {
106135
// Filter wraps an iterator such that Next only returns nodes for which
107136
// the 'check' function returns true.
108137
func Filter(it Iterator, check func(*Node) bool) Iterator {
109-
return &filterIter{it, check}
138+
return &filterIter{ensureSourceIter(it), check}
110139
}
111140

112141
type filterIter struct {
113-
Iterator
142+
SourceIterator
114143
check func(*Node) bool
115144
}
116145

117146
func (f *filterIter) Next() bool {
118-
for f.Iterator.Next() {
147+
for f.SourceIterator.Next() {
119148
if f.check(f.Node()) {
120149
return true
121150
}
@@ -135,9 +164,9 @@ func (f *filterIter) Next() bool {
135164
// It's safe to call AddSource and Close concurrently with Next.
136165
type FairMix struct {
137166
wg sync.WaitGroup
138-
fromAny chan *Node
167+
fromAny chan mixItem
139168
timeout time.Duration
140-
cur *Node
169+
cur mixItem
141170

142171
mu sync.Mutex
143172
closed chan struct{}
@@ -146,11 +175,16 @@ type FairMix struct {
146175
}
147176

148177
type mixSource struct {
149-
it Iterator
150-
next chan *Node
178+
it SourceIterator
179+
next chan mixItem
151180
timeout time.Duration
152181
}
153182

183+
type mixItem struct {
184+
n *Node
185+
source string
186+
}
187+
154188
// NewFairMix creates a mixer.
155189
//
156190
// The timeout specifies how long the mixer will wait for the next fairly-chosen source
@@ -159,7 +193,7 @@ type mixSource struct {
159193
// timeout makes the mixer completely fair.
160194
func NewFairMix(timeout time.Duration) *FairMix {
161195
m := &FairMix{
162-
fromAny: make(chan *Node),
196+
fromAny: make(chan mixItem),
163197
closed: make(chan struct{}),
164198
timeout: timeout,
165199
}
@@ -175,7 +209,11 @@ func (m *FairMix) AddSource(it Iterator) {
175209
return
176210
}
177211
m.wg.Add(1)
178-
source := &mixSource{it, make(chan *Node), m.timeout}
212+
source := &mixSource{
213+
it: ensureSourceIter(it),
214+
next: make(chan mixItem),
215+
timeout: m.timeout,
216+
}
179217
m.sources = append(m.sources, source)
180218
go m.runSource(m.closed, source)
181219
}
@@ -201,7 +239,7 @@ func (m *FairMix) Close() {
201239

202240
// Next returns a node from a random source.
203241
func (m *FairMix) Next() bool {
204-
m.cur = nil
242+
m.cur = mixItem{}
205243

206244
for {
207245
source := m.pickSource()
@@ -217,12 +255,12 @@ func (m *FairMix) Next() bool {
217255
}
218256

219257
select {
220-
case n, ok := <-source.next:
258+
case item, ok := <-source.next:
221259
if ok {
222260
// Here, the timeout is reset to the configured value
223261
// because the source delivered a node.
224262
source.timeout = m.timeout
225-
m.cur = n
263+
m.cur = item
226264
return true
227265
}
228266
// This source has ended.
@@ -239,15 +277,20 @@ func (m *FairMix) Next() bool {
239277

240278
// Node returns the current node.
241279
func (m *FairMix) Node() *Node {
242-
return m.cur
280+
return m.cur.n
281+
}
282+
283+
// NodeSource returns the current node's source name.
284+
func (m *FairMix) NodeSource() string {
285+
return m.cur.source
243286
}
244287

245288
// nextFromAny is used when there are no sources or when the 'fair' choice
246289
// doesn't turn up a node quickly enough.
247290
func (m *FairMix) nextFromAny() bool {
248-
n, ok := <-m.fromAny
291+
item, ok := <-m.fromAny
249292
if ok {
250-
m.cur = n
293+
m.cur = item
251294
}
252295
return ok
253296
}
@@ -284,10 +327,10 @@ func (m *FairMix) runSource(closed chan struct{}, s *mixSource) {
284327
defer m.wg.Done()
285328
defer close(s.next)
286329
for s.it.Next() {
287-
n := s.it.Node()
330+
item := mixItem{s.it.Node(), s.it.NodeSource()}
288331
select {
289-
case s.next <- n:
290-
case m.fromAny <- n:
332+
case s.next <- item:
333+
case m.fromAny <- item:
291334
case <-closed:
292335
return
293336
}

p2p/enode/iter_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package enode
1919
import (
2020
"encoding/binary"
2121
"runtime"
22+
"slices"
2223
"sync/atomic"
2324
"testing"
2425
"time"
@@ -183,6 +184,53 @@ func TestFairMixRemoveSource(t *testing.T) {
183184
}
184185
}
185186

187+
// This checks that FairMix correctly returns the name of the source that produced the node.
188+
func TestFairMixSourceName(t *testing.T) {
189+
nodes := make([]*Node, 6)
190+
for i := range nodes {
191+
nodes[i] = testNode(uint64(i), uint64(i))
192+
}
193+
mix := NewFairMix(-1)
194+
mix.AddSource(WithSourceName("s1", IterNodes(nodes[0:2])))
195+
mix.AddSource(WithSourceName("s2", IterNodes(nodes[2:4])))
196+
mix.AddSource(WithSourceName("s3", IterNodes(nodes[4:6])))
197+
198+
var names []string
199+
for range nodes {
200+
mix.Next()
201+
names = append(names, mix.NodeSource())
202+
}
203+
want := []string{"s2", "s3", "s1", "s2", "s3", "s1"}
204+
if !slices.Equal(names, want) {
205+
t.Fatalf("wrong names: %v", names)
206+
}
207+
}
208+
209+
// This checks that FairMix returns the name of the source that produced the node,
210+
// even when FairMix instances are nested.
211+
func TestFairMixNestedSourceName(t *testing.T) {
212+
nodes := make([]*Node, 6)
213+
for i := range nodes {
214+
nodes[i] = testNode(uint64(i), uint64(i))
215+
}
216+
mix := NewFairMix(-1)
217+
mix.AddSource(WithSourceName("s1", IterNodes(nodes[0:2])))
218+
submix := NewFairMix(-1)
219+
submix.AddSource(WithSourceName("s2", IterNodes(nodes[2:4])))
220+
submix.AddSource(WithSourceName("s3", IterNodes(nodes[4:6])))
221+
mix.AddSource(submix)
222+
223+
var names []string
224+
for range nodes {
225+
mix.Next()
226+
names = append(names, mix.NodeSource())
227+
}
228+
want := []string{"s3", "s1", "s2", "s1", "s3", "s2"}
229+
if !slices.Equal(names, want) {
230+
t.Fatalf("wrong names: %v", names)
231+
}
232+
}
233+
186234
type blockingIter chan struct{}
187235

188236
func (it blockingIter) Next() bool {

0 commit comments

Comments
 (0)