Skip to content

Commit bc9d7af

Browse files
committed
wip
1 parent 478875d commit bc9d7af

19 files changed

+173
-666
lines changed

adapters.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ var ErrInvalidCommand = errors.New("invalid command type")
1717
// ErrInvalidPool is returned when the pool type is not supported.
1818
var ErrInvalidPool = errors.New("invalid pool type")
1919

20-
// NewClientAdapter creates a new client adapter for regular Redis clients.
21-
func NewClientAdapter(client *baseClient) interfaces.ClientInterface {
20+
// newClientAdapter creates a new client adapter for regular Redis clients.
21+
func newClientAdapter(client *baseClient) interfaces.ClientInterface {
2222
return &clientAdapter{client: client}
2323
}
2424

async_handoff_integration_test.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
4141
}
4242

4343
// Create processor with event-driven handoff support
44-
processor := hitless.NewPoolHook(3, baseDialer, nil, nil)
44+
processor := hitless.NewPoolHook(baseDialer, nil, nil)
4545
defer processor.Shutdown(context.Background())
4646

4747
// Create a test pool with hooks
@@ -52,10 +52,12 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
5252
Dialer: func(ctx context.Context) (net.Conn, error) {
5353
return &mockNetConn{addr: "original:6379"}, nil
5454
},
55-
PoolHooks: hookManager,
56-
PoolSize: 5,
57-
PoolTimeout: time.Second,
55+
PoolSize: 5,
56+
PoolTimeout: time.Second,
5857
})
58+
59+
// Add the hook to the pool after creation
60+
testPool.AddPoolHook(processor)
5961
defer testPool.Close()
6062

6163
// Set the pool reference in the processor for connection removal on handoff failure
@@ -131,7 +133,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
131133
return &mockNetConn{addr: addr}, nil
132134
}
133135

134-
processor := hitless.NewPoolHook(3, baseDialer, nil, nil)
136+
processor := hitless.NewPoolHook(baseDialer, nil, nil)
135137
defer processor.Shutdown(context.Background())
136138

137139
// Create hooks manager and add processor as hook
@@ -142,12 +144,15 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
142144
Dialer: func(ctx context.Context) (net.Conn, error) {
143145
return &mockNetConn{addr: "original:6379"}, nil
144146
},
145-
PoolHooks: hookManager,
147+
146148
PoolSize: 10,
147149
PoolTimeout: time.Second,
148150
})
149151
defer testPool.Close()
150152

153+
// Add the hook to the pool after creation
154+
testPool.AddPoolHook(processor)
155+
151156
// Set the pool reference in the processor
152157
processor.SetPool(testPool)
153158

@@ -200,7 +205,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
200205
return nil, &net.OpError{Op: "dial", Err: &net.DNSError{Name: addr}}
201206
}
202207

203-
processor := hitless.NewPoolHook(3, failingDialer, nil, nil)
208+
processor := hitless.NewPoolHook(failingDialer, nil, nil)
204209
defer processor.Shutdown(context.Background())
205210

206211
// Create hooks manager and add processor as hook
@@ -211,12 +216,15 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
211216
Dialer: func(ctx context.Context) (net.Conn, error) {
212217
return &mockNetConn{addr: "original:6379"}, nil
213218
},
214-
PoolHooks: hookManager,
219+
215220
PoolSize: 3,
216221
PoolTimeout: time.Second,
217222
})
218223
defer testPool.Close()
219224

225+
// Add the hook to the pool after creation
226+
testPool.AddPoolHook(processor)
227+
220228
// Set the pool reference in the processor
221229
processor.SetPool(testPool)
222230

@@ -260,7 +268,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
260268
return &mockNetConn{addr: addr}, nil
261269
}
262270

263-
processor := hitless.NewPoolHook(3, slowDialer, nil, nil)
271+
processor := hitless.NewPoolHook(slowDialer, nil, nil)
264272

265273
// Create hooks manager and add processor as hook
266274
hookManager := pool.NewPoolHookManager()
@@ -270,12 +278,15 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
270278
Dialer: func(ctx context.Context) (net.Conn, error) {
271279
return &mockNetConn{addr: "original:6379"}, nil
272280
},
273-
PoolHooks: hookManager,
281+
274282
PoolSize: 2,
275283
PoolTimeout: time.Second,
276284
})
277285
defer testPool.Close()
278286

287+
// Add the hook to the pool after creation
288+
testPool.AddPoolHook(processor)
289+
279290
// Set the pool reference in the processor
280291
processor.SetPool(testPool)
281292

commands.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,9 @@ func (c cmdable) ClientInfo(ctx context.Context) *ClientInfoCmd {
524524
func (c cmdable) ClientMaintNotifications(ctx context.Context, enabled bool, endpointType string) *StatusCmd {
525525
args := []interface{}{"client", "maint_notifications"}
526526
if enabled {
527+
if endpointType == "" {
528+
endpointType = "none"
529+
}
527530
args = append(args, "on", "moving-endpoint-type", endpointType)
528531
} else {
529532
args = append(args, "off")

hitless/config_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func TestProcessorWithConfig(t *testing.T) {
249249
return &mockNetConn{addr: addr}, nil
250250
}
251251

252-
processor := NewPoolHook(3, baseDialer, config, nil)
252+
processor := NewPoolHook(baseDialer, config, nil)
253253
defer processor.Shutdown(context.Background())
254254

255255
// The processor should be created successfully with custom config
@@ -269,7 +269,7 @@ func TestProcessorWithConfig(t *testing.T) {
269269
return &mockNetConn{addr: addr}, nil
270270
}
271271

272-
processor := NewPoolHook(3, baseDialer, config, nil)
272+
processor := NewPoolHook(baseDialer, config, nil)
273273
defer processor.Shutdown(context.Background())
274274

275275
// Should work with partial config (defaults applied)
@@ -283,7 +283,7 @@ func TestProcessorWithConfig(t *testing.T) {
283283
return &mockNetConn{addr: addr}, nil
284284
}
285285

286-
processor := NewPoolHook(3, baseDialer, nil, nil)
286+
processor := NewPoolHook(baseDialer, nil, nil)
287287
defer processor.Shutdown(context.Background())
288288

289289
// Should use default config when nil is passed
@@ -308,7 +308,7 @@ func TestIntegrationWithApplyDefaults(t *testing.T) {
308308
}
309309

310310
// Create processor - should apply defaults to missing fields
311-
processor := NewPoolHook(3, baseDialer, partialConfig, nil)
311+
processor := NewPoolHook(baseDialer, partialConfig, nil)
312312
defer processor.Shutdown(context.Background())
313313

314314
// Processor should be created successfully

hitless/hitless_manager.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/redis/go-redis/v9/internal"
1212
"github.com/redis/go-redis/v9/internal/interfaces"
13+
"github.com/redis/go-redis/v9/internal/pool"
1314
)
1415

1516
// Push notification type constants for hitless upgrades
@@ -56,6 +57,7 @@ type HitlessManager struct {
5657
client interfaces.ClientInterface
5758
config *Config
5859
options interfaces.OptionsInterface
60+
pool pool.Pooler
5961

6062
// MOVING operation tracking - using sync.Map for better concurrent performance
6163
activeMovingOps sync.Map // map[MovingOperationKey]*MovingOperation
@@ -65,8 +67,9 @@ type HitlessManager struct {
6567
closed atomic.Bool // Manager closed state
6668

6769
// Notification hooks for extensibility
68-
hooks []NotificationHook
69-
hooksMu sync.RWMutex // Protects hooks slice
70+
hooks []NotificationHook
71+
hooksMu sync.RWMutex // Protects hooks slice
72+
poolHooksRef *PoolHook
7073
}
7174

7275
// MovingOperation tracks an active MOVING operation.
@@ -78,13 +81,14 @@ type MovingOperation struct {
7881
}
7982

8083
// NewHitlessManager creates a new simplified hitless manager.
81-
func NewHitlessManager(client interfaces.ClientInterface, config *Config) (*HitlessManager, error) {
84+
func NewHitlessManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*HitlessManager, error) {
8285
if client == nil {
8386
return nil, ErrInvalidClient
8487
}
8588

8689
hm := &HitlessManager{
8790
client: client,
91+
pool: pool,
8892
options: client.GetOptions(),
8993
config: config.Clone(),
9094
hooks: make([]NotificationHook, 0),
@@ -98,9 +102,10 @@ func NewHitlessManager(client interfaces.ClientInterface, config *Config) (*Hitl
98102
return hm, nil
99103
}
100104

101-
// CreatePoolHook creates a pool hook with a custom dialer.
102-
func (hm *HitlessManager) CreatePoolHook(baseDialer func(context.Context, string, string) (net.Conn, error)) *PoolHook {
103-
return hm.createPoolHook(baseDialer)
105+
// GetPoolHook creates a pool hook with a custom dialer.
106+
func (hm *HitlessManager) InitPoolHook(baseDialer func(context.Context, string, string) (net.Conn, error)) {
107+
poolHook := hm.createPoolHook(baseDialer)
108+
hm.pool.AddPoolHook(poolHook)
104109
}
105110

106111
// setupPushNotifications sets up push notification handling by registering with the client's processor.
@@ -208,6 +213,16 @@ func (hm *HitlessManager) Close() error {
208213
return nil // Already closed
209214
}
210215

216+
// Shutdown the pool hook
217+
err := hm.poolHooksRef.Shutdown(context.Background())
218+
if err != nil {
219+
// was not able to close pool hook, keep closed state false
220+
hm.closed.Store(false)
221+
return err
222+
}
223+
// Remove the pool hook from the pool
224+
hm.pool.RemovePoolHook(hm.poolHooksRef)
225+
211226
// Clear all active operations
212227
hm.activeMovingOps.Range(func(key, value interface{}) bool {
213228
hm.activeMovingOps.Delete(key)
@@ -299,13 +314,17 @@ func (fh *FilterHook) PostHook(ctx context.Context, notificationType string, not
299314

300315
// createPoolHook creates a pool hook with this manager already set.
301316
func (hm *HitlessManager) createPoolHook(baseDialer func(context.Context, string, string) (net.Conn, error)) *PoolHook {
317+
if hm.poolHooksRef != nil {
318+
return hm.poolHooksRef
319+
}
302320
// Get pool size from client options for better worker defaults
303321
poolSize := 0
304322
if hm.options != nil {
305323
poolSize = hm.options.GetPoolSize()
306324
}
307325

308-
hook := NewPoolHookWithPoolSize(baseDialer, hm.config, hm, poolSize)
326+
hm.poolHooksRef = NewPoolHookWithPoolSize(baseDialer, hm.config, hm, poolSize)
327+
hm.poolHooksRef.SetPool(hm.pool)
309328

310-
return hook
329+
return hm.poolHooksRef
311330
}

0 commit comments

Comments
 (0)