Skip to content

Commit a916362

Browse files
ae86zhizhiautopear
authored andcommitted
Fix race condition by checking manager stopped state before creating context
Signed-off-by: ZHENYU <[email protected]>
1 parent dea11c0 commit a916362

File tree

8 files changed

+39
-21
lines changed

8 files changed

+39
-21
lines changed

pkg/kvevent/doc.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@
3737
//
3838
// The syncIndexerAdapter in pkg/cache might seem like unnecessary complexity,
3939
// but it serves critical architectural purposes:
40-
// 1. Prevents circular dependencies between packages
41-
// 2. Acts as an Anti-Corruption Layer between domains
42-
// 3. Allows packages to remain independent and testable
43-
// 4. Handles type conversion between nearly-identical structs
40+
// 1. Prevents circular dependencies between packages
41+
// 2. Acts as an Anti-Corruption Layer between domains
42+
// 3. Allows packages to remain independent and testable
43+
// 4. Handles type conversion between nearly-identical structs
4444
//
4545
// Build Requirements:
4646
//
4747
// This package requires ZMQ support. Build with:
48-
// go build -tags=zmq
4948
//
50-
package kvevent
49+
// go build -tags=zmq
50+
package kvevent

pkg/kvevent/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,4 @@ func IsTemporaryError(err error) bool {
4242
return errors.Is(err, ErrIndexerNotInitialized) ||
4343
errors.Is(err, context.DeadlineExceeded) ||
4444
errors.Is(err, context.Canceled)
45-
}
45+
}

pkg/kvevent/handler.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,7 @@ type eventHandler struct {
3838

3939
// HandleEvent processes KV cache events
4040
func (h *eventHandler) HandleEvent(event kvcache.KVEvent) error {
41-
// Create context with timeout
42-
// Use 10s timeout for event processing as sync indexer operations
43-
// may involve Redis calls and network I/O under high load
44-
ctx, cancel := context.WithTimeout(h.manager.ctx, 10*time.Second)
45-
defer cancel()
46-
47-
// Check if manager is stopped
41+
// Check if manager is stopped before using its context
4842
h.manager.mu.RLock()
4943
stopped := h.manager.stopped
5044
h.manager.mu.RUnlock()
@@ -53,6 +47,12 @@ func (h *eventHandler) HandleEvent(event kvcache.KVEvent) error {
5347
return ErrManagerStopped
5448
}
5549

50+
// Create context with timeout
51+
// Use 10s timeout for event processing as sync indexer operations
52+
// may involve Redis calls and network I/O under high load
53+
ctx, cancel := context.WithTimeout(h.manager.ctx, 10*time.Second)
54+
defer cancel()
55+
5656
switch e := event.(type) {
5757
case *kvcache.BlockStoredEvent:
5858
return h.handleBlockStored(ctx, e)
@@ -155,4 +155,4 @@ func tokenIDsToBytes(tokenIDs []int32) []byte {
155155
binary.BigEndian.PutUint32(bytes[i*4:], uint32(id))
156156
}
157157
return bytes
158-
}
158+
}

pkg/kvevent/handler_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ func TestTokenIDsToBytes(t *testing.T) {
8383
name: "multiple tokens",
8484
tokenIDs: []int32{1, 256, 65535},
8585
expected: []byte{
86-
0, 0, 0, 1, // 1
87-
0, 0, 1, 0, // 256
86+
0, 0, 0, 1, // 1
87+
0, 0, 1, 0, // 256
8888
0, 0, 255, 255, // 65535
8989
},
9090
},
@@ -379,4 +379,4 @@ func TestBinaryEncoding(t *testing.T) {
379379
t.Errorf("Byte %d: expected %d, got %d", i, expected[i], bytes[i])
380380
}
381381
}
382-
}
382+
}

pkg/kvevent/interfaces.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,4 @@ type BlockRemovedEvent struct {
8686
ModelName string
8787
LoraID int64
8888
SourcePod string
89-
}
89+
}

pkg/kvevent/manager.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,15 @@ func (m *Manager) OnPodAdd(pod *v1.Pod) {
162162
return
163163
}
164164

165+
// Check if manager is stopped before using its context
166+
m.mu.RLock()
167+
stopped := m.stopped
168+
m.mu.RUnlock()
169+
170+
if stopped {
171+
return
172+
}
173+
165174
// Use 5s timeout for pod operations as they involve simple ZMQ ops
166175
ctx, cancel := context.WithTimeout(m.ctx, 5*time.Second)
167176
defer cancel()
@@ -212,6 +221,15 @@ func (m *Manager) OnPodDelete(pod *v1.Pod) {
212221
podKey := utils.GeneratePodKey(pod.Namespace, pod.Name)
213222
m.unsubscribeFromPod(podKey)
214223

224+
// Check if manager is stopped before using its context
225+
m.mu.RLock()
226+
stopped := m.stopped
227+
m.mu.RUnlock()
228+
229+
if stopped {
230+
return
231+
}
232+
215233
// Clean up from sync indexer
216234
// Use 5s timeout for cleanup operations as they should be quick
217235
ctx, cancel := context.WithTimeout(m.ctx, 5*time.Second)

pkg/kvevent/manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,4 @@ func TestManagerCreation(t *testing.T) {
9090

9191
// Test Stop (should not panic)
9292
manager.Stop()
93-
}
93+
}

pkg/kvevent/test_helpers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,4 @@ func NewEventHandlerForTest(manager *Manager, podKey, modelName string, loraID i
2929
modelName: modelName,
3030
loraID: loraID,
3131
}
32-
}
32+
}

0 commit comments

Comments
 (0)