Skip to content

Commit b448808

Browse files
committed
Priority preparation...
1 parent 8d82a57 commit b448808

File tree

5 files changed

+40
-36
lines changed

5 files changed

+40
-36
lines changed

pkg/node/db_write_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (wh *DbWriteHandler) Handle(conn transports.Connection, frame []byte) {
150150
if wh.distributor != nil {
151151
// Use high priority for writes coming from direct client requests
152152
go func() {
153-
if err := wh.distributor.DistributeRecord(key, value, PriorityHigh, TargetAll); err != nil {
153+
if err := wh.distributor.DistributeRecord(key, value, types.PriorityHigh, types.TargetAll); err != nil {
154154
wh.logger.Error("Failed to distribute record",
155155
zap.Error(err),
156156
zap.Binary("key", key[:]),

pkg/node/node.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package node
22

33
import (
44
"context"
5+
"github.com/unpackdev/fdb/pkg/types"
56

67
"github.com/libp2p/go-libp2p/core/peer"
78
"github.com/unpackdev/fdb/pkg/accounts"
@@ -245,17 +246,17 @@ func (n *Node) Distributor() *P2PDistributor {
245246

246247
// DistributeRecord adds a record to be distributed across the P2P network
247248
func (n *Node) DistributeRecord(key [32]byte, value []byte) error {
248-
return n.distributor.DistributeRecord(key, value, PriorityNormal, TargetAll)
249+
return n.distributor.DistributeRecord(key, value, types.PriorityNormal, types.TargetAll)
249250
}
250251

251252
// DistributeRecordWithPriority adds a record with specified priority and target
252-
func (n *Node) DistributeRecordWithPriority(key [32]byte, value []byte, priority Priority, target Target) error {
253+
func (n *Node) DistributeRecordWithPriority(key [32]byte, value []byte, priority types.Priority, target types.Target) error {
253254
return n.distributor.DistributeRecord(key, value, priority, target)
254255
}
255256

256257
// DistributeRecordToPeer sends a record directly to a specific peer
257258
func (n *Node) DistributeRecordToPeer(key [32]byte, value []byte, peerID peer.ID) error {
258-
return n.distributor.DistributeRecordToPeer(key, value, peerID, PriorityNormal)
259+
return n.distributor.DistributeRecordToPeer(key, value, peerID, types.PriorityNormal)
259260
}
260261

261262
func (n *Node) Start() error {

pkg/node/p2p_distributor.go

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package node
22

33
import (
44
"context"
5+
"github.com/unpackdev/fdb/pkg/types"
56
"sync"
67
"time"
78

@@ -25,24 +26,6 @@ const (
2526
// P2PDistributorStateType is the type identifier for P2PDistributor state
2627
const P2PDistributorStateType state.StateType = "p2p_distributor"
2728

28-
// Priority represents the importance level of a record batch
29-
type Priority uint8
30-
31-
const (
32-
PriorityHigh Priority = iota
33-
PriorityNormal
34-
PriorityLow
35-
)
36-
37-
// Target specifies the distribution target type
38-
type Target uint8
39-
40-
const (
41-
TargetAll Target = iota
42-
TargetValidators
43-
TargetDirectPeer
44-
)
45-
4629
// DistributionStats tracks performance metrics
4730
type DistributionStats struct {
4831
RecordsDistributed int64
@@ -56,8 +39,8 @@ type DistributionStats struct {
5639
// RecordBatch represents a batch of records to be distributed
5740
type RecordBatch struct {
5841
Records []db.WriteRequest
59-
Priority Priority
60-
Target Target
42+
Priority types.Priority
43+
Target types.Target
6144
TargetPeer *peer.ID // Only set if Target is TargetDirectPeer
6245
}
6346

@@ -439,13 +422,13 @@ func (d *P2PDistributor) distributeBatch(batch *RecordBatch) {
439422
zap.Uint8("priority", uint8(batch.Priority)))
440423

441424
switch batch.Target {
442-
case TargetAll:
425+
case types.TargetAll:
443426
d.logger.Debug("Distributing to all peers", zap.Int("record_count", recordCount))
444427
d.distributeToAllPeers(batch)
445-
case TargetValidators:
428+
case types.TargetValidators:
446429
d.logger.Debug("Distributing to validators", zap.Int("record_count", recordCount))
447430
d.distributeToValidators(batch)
448-
case TargetDirectPeer:
431+
case types.TargetDirectPeer:
449432
if batch.TargetPeer != nil {
450433
d.logger.Debug("Distributing to direct peer",
451434
zap.Int("record_count", recordCount),
@@ -458,7 +441,7 @@ func (d *P2PDistributor) distributeBatch(batch *RecordBatch) {
458441
}
459442

460443
// DistributeRecord adds a record to the appropriate distribution queue
461-
func (d *P2PDistributor) DistributeRecord(key [32]byte, value []byte, priority Priority, target Target) error {
444+
func (d *P2PDistributor) DistributeRecord(key [32]byte, value []byte, priority types.Priority, target types.Target) error {
462445
// Create a deep copy of the value when a record first enters the system
463446
valueCopy := make([]byte, len(value))
464447
copy(valueCopy, value)
@@ -477,11 +460,11 @@ func (d *P2PDistributor) DistributeRecord(key [32]byte, value []byte, priority P
477460
// Queue based on priority
478461
var queue chan *RecordBatch
479462
switch priority {
480-
case PriorityHigh:
463+
case types.PriorityHigh:
481464
queue = d.highPriorityQueue
482-
case PriorityNormal:
465+
case types.PriorityNormal:
483466
queue = d.normalQueue
484-
case PriorityLow:
467+
case types.PriorityLow:
485468
queue = d.lowPriorityQueue
486469
}
487470

@@ -508,7 +491,7 @@ func (d *P2PDistributor) DistributeRecord(key [32]byte, value []byte, priority P
508491
}
509492

510493
// DistributeRecordToPeer sends a record directly to a specific peer
511-
func (d *P2PDistributor) DistributeRecordToPeer(key [32]byte, value []byte, peerID peer.ID, priority Priority) error {
494+
func (d *P2PDistributor) DistributeRecordToPeer(key [32]byte, value []byte, peerID peer.ID, priority types.Priority) error {
512495
// Create a deep copy of the value when a record first enters the system
513496
valueCopy := make([]byte, len(value))
514497
copy(valueCopy, value)
@@ -521,18 +504,18 @@ func (d *P2PDistributor) DistributeRecordToPeer(key [32]byte, value []byte, peer
521504
batch := &RecordBatch{
522505
Records: []db.WriteRequest{record},
523506
Priority: priority,
524-
Target: TargetDirectPeer,
507+
Target: types.TargetDirectPeer,
525508
TargetPeer: &peerID,
526509
}
527510

528511
// Queue based on priority
529512
var queue chan *RecordBatch
530513
switch priority {
531-
case PriorityHigh:
514+
case types.PriorityHigh:
532515
queue = d.highPriorityQueue
533-
case PriorityNormal:
516+
case types.PriorityNormal:
534517
queue = d.normalQueue
535-
case PriorityLow:
518+
case types.PriorityLow:
536519
queue = d.lowPriorityQueue
537520
}
538521

pkg/types/priority.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package types
2+
3+
// Priority represents the importance level of a record batch
4+
type Priority uint8
5+
6+
const (
7+
PriorityHigh Priority = iota
8+
PriorityNormal
9+
PriorityLow
10+
)

pkg/types/target.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package types
2+
3+
// Target specifies the distribution target type
4+
type Target uint8
5+
6+
const (
7+
TargetAll Target = iota
8+
TargetValidators
9+
TargetDirectPeer
10+
)

0 commit comments

Comments
 (0)