Skip to content

Commit dea11c0

Browse files
Qizhong Maoautopear
authored andcommitted
Address review comments
Signed-off-by: Qizhong Mao <[email protected]>
1 parent 5fbd817 commit dea11c0

17 files changed

+192
-213
lines changed

cmd/plugins/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"net/http"
2323
"os"
2424
"os/signal"
25-
"strconv"
2625
"syscall"
2726

2827
"google.golang.org/grpc"
@@ -33,6 +32,7 @@ import (
3332

3433
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
3534
"github.com/vllm-project/aibrix/pkg/cache"
35+
"github.com/vllm-project/aibrix/pkg/constants"
3636
"github.com/vllm-project/aibrix/pkg/plugins/gateway"
3737
routing "github.com/vllm-project/aibrix/pkg/plugins/gateway/algorithms"
3838
"github.com/vllm-project/aibrix/pkg/utils"
@@ -80,8 +80,8 @@ func main() {
8080
}
8181

8282
// Initialize cache with KV sync enabled for gateway
83-
kvSyncEnabled, _ := strconv.ParseBool(utils.LoadEnv("AIBRIX_KV_EVENT_SYNC_ENABLED", "false"))
84-
remoteTokenizerEnabled, _ := strconv.ParseBool(utils.LoadEnv("AIBRIX_USE_REMOTE_TOKENIZER", "false"))
83+
kvSyncEnabled := utils.LoadEnvBool(constants.EnvPrefixCacheKVEventSyncEnabled, false)
84+
remoteTokenizerEnabled := utils.LoadEnvBool(constants.EnvPrefixCacheUseRemoteTokenizer, false)
8585

8686
cache.InitWithOptions(config, stopCh, cache.InitOptions{
8787
EnableKVSync: kvSyncEnabled && remoteTokenizerEnabled,

docs/kv-event-sync-readme.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ This directory contains the core documentation for KV Event Synchronization feat
2626
## Quick Links
2727

2828
- **Feature Requirements**: vLLM 0.7.0+, ZMQ support, Remote tokenizer
29-
- **Key Environment Variable**: `AIBRIX_KV_EVENT_SYNC_ENABLED=true`
29+
- **Key Environment Variable**: `AIBRIX_PREFIX_CACHE_KV_EVENT_SYNC_ENABLED=true`
3030
- **Build Tag**: `-tags="zmq"` (for gateway-plugins only)
3131

3232
## Getting Started

docs/source/features/kv-event-sync.rst

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,16 @@ Environment Variables
5656
* - Variable
5757
- Default
5858
- Description
59-
* - ``AIBRIX_KV_EVENT_SYNC_ENABLED``
59+
* - ``AIBRIX_PREFIX_CACHE_KV_EVENT_SYNC_ENABLED``
6060
- ``false``
6161
- Enable KV event synchronization
62-
* - ``AIBRIX_USE_REMOTE_TOKENIZER``
62+
* - ``AIBRIX_PREFIX_CACHE_USE_REMOTE_TOKENIZER``
6363
- ``false``
6464
- Must be ``true`` for KV sync
65-
* - ``AIBRIX_REMOTE_TOKENIZER_ENDPOINT``
65+
* - ``AIBRIX_PREFIX_CACHE_REMOTE_TOKENIZER_ENDPOINT``
6666
- -
6767
- vLLM service endpoint
68-
* - ``AIBRIX_PREFIX_CACHE_METRICS_ENABLED``
68+
* - ``AIBRIX_PREFIX_CACHE_LOCAL_ROUTER_METRICS_ENABLED``
6969
- ``false``
7070
- Enable prefix cache metrics
7171

@@ -121,18 +121,18 @@ Quick Start
121121
1. **Enable Remote Tokenizer** (mandatory prerequisite)::
122122

123123
kubectl set env deployment/aibrix-gateway-plugins -n aibrix-system \
124-
AIBRIX_USE_REMOTE_TOKENIZER=true \
125-
AIBRIX_REMOTE_TOKENIZER_ENDPOINT=http://vllm-service:8000
124+
AIBRIX_PREFIX_CACHE_USE_REMOTE_TOKENIZER=true \
125+
AIBRIX_PREFIX_CACHE_REMOTE_TOKENIZER_ENDPOINT=http://vllm-service:8000
126126

127127
2. **Enable KV Event Sync**::
128128

129129
kubectl set env deployment/aibrix-gateway-plugins -n aibrix-system \
130-
AIBRIX_KV_EVENT_SYNC_ENABLED=true
130+
AIBRIX_PREFIX_CACHE_KV_EVENT_SYNC_ENABLED=true
131131

132132
3. **Enable Prefix Cache Metrics** (optional but recommended)::
133133

134134
kubectl set env deployment/aibrix-gateway-plugins -n aibrix-system \
135-
AIBRIX_PREFIX_CACHE_METRICS_ENABLED=true
135+
AIBRIX_PREFIX_CACHE_LOCAL_ROUTER_METRICS_ENABLED=true
136136

137137
3. **Deploy vLLM with KV Events**:
138138

@@ -291,7 +291,7 @@ To disable KV event sync::
291291

292292
# Disable in gateway
293293
kubectl set env deployment/aibrix-gateway-plugins -n aibrix-system \
294-
AIBRIX_KV_EVENT_SYNC_ENABLED=false
294+
AIBRIX_PREFIX_CACHE_KV_EVENT_SYNC_ENABLED=false
295295

296296
# Remove from vLLM deployments
297297
kubectl label deployment vllm-model model.aibrix.ai/kv-events-enabled-

pkg/cache/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ Manages KV cache event synchronization between vLLM pods and the routing system:
3232
// It subscribes to eligible pods and processes their KV cache events
3333

3434
// Configuration via environment variables:
35-
// AIBRIX_KV_EVENT_SYNC_ENABLED=true
36-
// AIBRIX_USE_REMOTE_TOKENIZER=true
35+
// AIBRIX_PREFIX_CACHE_KV_EVENT_SYNC_ENABLED=true
36+
// AIBRIX_PREFIX_CACHE_USE_REMOTE_TOKENIZER=true
3737
```
3838

3939
**Features:**
@@ -89,8 +89,8 @@ Kubernetes informers for watching:
8989
- `AIBRIX_POD_RAYCLUSTERFLEET_LABEL`: Label for fleet identification
9090

9191
**KV Event Sync:**
92-
- `AIBRIX_KV_EVENT_SYNC_ENABLED`: Enable KV event synchronization
93-
- `AIBRIX_USE_REMOTE_TOKENIZER`: Enable remote tokenizer (required for KV sync)
92+
- `AIBRIX_PREFIX_CACHE_KV_EVENT_SYNC_ENABLED`: Enable KV event synchronization
93+
- `AIBRIX_PREFIX_CACHE_USE_REMOTE_TOKENIZER`: Enable remote tokenizer (required for KV sync)
9494

9595
**Performance:**
9696
- `AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS`: Metric refresh interval
@@ -100,8 +100,8 @@ Kubernetes informers for watching:
100100

101101
```go
102102
// Create cache with KV event sync enabled
103-
os.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "true")
104-
os.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "true")
103+
os.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "true")
104+
os.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "true")
105105

106106
store := cache.NewStore()
107107

pkg/cache/build_verification_zmq_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@ package cache
1818

1919
import (
2020
"testing"
21+
22+
"github.com/vllm-project/aibrix/pkg/constants"
2123
)
2224

2325
func TestBuildModeIsZMQ(t *testing.T) {
2426
t.Log("✅ Verified ZMQ build")
2527

2628
// Set environment to enable KV sync
27-
t.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "true")
28-
t.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "true")
29+
t.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "true")
30+
t.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "true")
2931

3032
// Verify ZMQ implementation behavior
3133
manager := NewKVEventManager(nil)

pkg/cache/cache_init.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23-
"strconv"
2423
"sync"
2524
"sync/atomic"
2625
"time"
@@ -31,6 +30,7 @@ import (
3130
"k8s.io/klog/v2"
3231

3332
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
33+
"github.com/vllm-project/aibrix/pkg/constants"
3434
"github.com/vllm-project/aibrix/pkg/metrics"
3535
"github.com/vllm-project/aibrix/pkg/utils"
3636
syncindexer "github.com/vllm-project/aibrix/pkg/utils/syncprefixcacheindexer"
@@ -431,10 +431,8 @@ func (s *Store) initKVEventSync() error {
431431
klog.Info("Initializing KV event synchronization")
432432

433433
// Check if KV sync should be enabled
434-
kvSyncValue := utils.LoadEnv("AIBRIX_KV_EVENT_SYNC_ENABLED", "false")
435-
kvSyncEnabled, _ := strconv.ParseBool(kvSyncValue)
436-
remoteTokenValue := utils.LoadEnv("AIBRIX_USE_REMOTE_TOKENIZER", "false")
437-
remoteTokenizerEnabled, _ := strconv.ParseBool(remoteTokenValue)
434+
kvSyncEnabled := utils.LoadEnvBool(constants.EnvPrefixCacheKVEventSyncEnabled, false)
435+
remoteTokenizerEnabled := utils.LoadEnvBool(constants.EnvPrefixCacheUseRemoteTokenizer, false)
438436

439437
// Early return if not enabled
440438
if !kvSyncEnabled {

pkg/cache/cache_init_test.go

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/redis/go-redis/v9"
2525
"github.com/stretchr/testify/assert"
26+
"github.com/vllm-project/aibrix/pkg/constants"
2627
)
2728

2829
func TestInitKVEventSync_FailureCleanup(t *testing.T) {
@@ -35,50 +36,50 @@ func TestInitKVEventSync_FailureCleanup(t *testing.T) {
3536
{
3637
name: "cleanup on Start failure - remote tokenizer not configured",
3738
setupEnv: func(t *testing.T) {
38-
t.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "true")
39-
t.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "true")
40-
t.Setenv("AIBRIX_PREFIX_CACHE_TOKENIZER_TYPE", "")
41-
t.Setenv("AIBRIX_REMOTE_TOKENIZER_ENDPOINT", "")
39+
t.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "true")
40+
t.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "true")
41+
t.Setenv(constants.EnvPrefixCacheTokenizerType, "")
42+
t.Setenv(constants.EnvPrefixCacheRemoteTokenizerEndpoint, "")
4243
},
4344
expectCleanup: true,
4445
expectError: true,
4546
},
4647
{
4748
name: "cleanup on Start failure - invalid tokenizer type",
4849
setupEnv: func(t *testing.T) {
49-
t.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "true")
50-
t.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "true")
51-
t.Setenv("AIBRIX_PREFIX_CACHE_TOKENIZER_TYPE", "local") // Should be "remote"
52-
t.Setenv("AIBRIX_REMOTE_TOKENIZER_ENDPOINT", "http://test:8080")
50+
t.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "true")
51+
t.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "true")
52+
t.Setenv(constants.EnvPrefixCacheTokenizerType, "local") // Should be "remote"
53+
t.Setenv(constants.EnvPrefixCacheRemoteTokenizerEndpoint, "http://test:8080")
5354
},
5455
expectCleanup: true,
5556
expectError: true,
5657
},
5758
{
5859
name: "no cleanup on success",
5960
setupEnv: func(t *testing.T) {
60-
t.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "true")
61-
t.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "true")
62-
t.Setenv("AIBRIX_PREFIX_CACHE_TOKENIZER_TYPE", "remote")
63-
t.Setenv("AIBRIX_REMOTE_TOKENIZER_ENDPOINT", "http://test:8080")
61+
t.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "true")
62+
t.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "true")
63+
t.Setenv(constants.EnvPrefixCacheTokenizerType, "remote")
64+
t.Setenv(constants.EnvPrefixCacheRemoteTokenizerEndpoint, "http://test:8080")
6465
},
6566
expectCleanup: false,
6667
expectError: false,
6768
},
6869
{
6970
name: "no error when KV sync disabled",
7071
setupEnv: func(t *testing.T) {
71-
t.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "false")
72-
t.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "true")
72+
t.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "false")
73+
t.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "true")
7374
},
7475
expectCleanup: false,
7576
expectError: false,
7677
},
7778
{
7879
name: "no error when remote tokenizer disabled",
7980
setupEnv: func(t *testing.T) {
80-
t.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "true")
81-
t.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "false")
81+
t.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "true")
82+
t.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "false")
8283
},
8384
expectCleanup: false,
8485
expectError: false,
@@ -110,8 +111,8 @@ func TestInitKVEventSync_FailureCleanup(t *testing.T) {
110111
assert.Nil(t, store.kvEventManager)
111112
assert.Nil(t, store.syncPrefixIndexer)
112113
} else if !expectedError &&
113-
os.Getenv("AIBRIX_KV_EVENT_SYNC_ENABLED") == "true" &&
114-
os.Getenv("AIBRIX_USE_REMOTE_TOKENIZER") == "true" {
114+
os.Getenv(constants.EnvPrefixCacheKVEventSyncEnabled) == "true" &&
115+
os.Getenv(constants.EnvPrefixCacheUseRemoteTokenizer) == "true" {
115116
assert.NotNil(t, store.kvEventManager)
116117
assert.NotNil(t, store.syncPrefixIndexer)
117118
}
@@ -134,10 +135,10 @@ func TestCleanupKVEventSync_Idempotent(t *testing.T) {
134135
}
135136

136137
func TestStore_Close_CallsCleanup(t *testing.T) {
137-
t.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "true")
138-
t.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "true")
139-
t.Setenv("AIBRIX_PREFIX_CACHE_TOKENIZER_TYPE", "remote")
140-
t.Setenv("AIBRIX_REMOTE_TOKENIZER_ENDPOINT", "http://test:8080")
138+
t.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "true")
139+
t.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "true")
140+
t.Setenv(constants.EnvPrefixCacheTokenizerType, "remote")
141+
t.Setenv(constants.EnvPrefixCacheRemoteTokenizerEndpoint, "http://test:8080")
141142

142143
store := &Store{}
143144
err := store.initKVEventSync()
@@ -172,17 +173,17 @@ func TestInitWithOptions_KVSyncBehavior(t *testing.T) {
172173
},
173174
expectKVSync: false,
174175
setupEnv: func(t *testing.T) {
175-
t.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "true")
176-
t.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "true")
176+
t.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "true")
177+
t.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "true")
177178
},
178179
},
179180
{
180181
name: "controller - no KV sync",
181182
opts: InitOptions{},
182183
expectKVSync: false,
183184
setupEnv: func(t *testing.T) {
184-
t.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "true")
185-
t.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "true")
185+
t.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "true")
186+
t.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "true")
186187
},
187188
},
188189
{
@@ -193,8 +194,8 @@ func TestInitWithOptions_KVSyncBehavior(t *testing.T) {
193194
},
194195
expectKVSync: false,
195196
setupEnv: func(t *testing.T) {
196-
t.Setenv("AIBRIX_KV_EVENT_SYNC_ENABLED", "false")
197-
t.Setenv("AIBRIX_USE_REMOTE_TOKENIZER", "false")
197+
t.Setenv(constants.EnvPrefixCacheKVEventSyncEnabled, "false")
198+
t.Setenv(constants.EnvPrefixCacheUseRemoteTokenizer, "false")
198199
},
199200
},
200201
}

pkg/cache/kv_event_manager_validation_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,44 +35,44 @@ func TestKVEventManagerValidation(t *testing.T) {
3535
{
3636
name: "disabled - no error",
3737
envVars: map[string]string{
38-
constants.EnvKVEventSyncEnabled: "false",
38+
constants.EnvPrefixCacheKVEventSyncEnabled: "false",
3939
},
4040
wantError: false,
4141
},
4242
{
4343
name: "enabled without tokenizer - error",
4444
envVars: map[string]string{
45-
constants.EnvKVEventSyncEnabled: "true",
46-
constants.EnvUseRemoteTokenizer: "false",
45+
constants.EnvPrefixCacheKVEventSyncEnabled: "true",
46+
constants.EnvPrefixCacheUseRemoteTokenizer: "false",
4747
},
4848
wantError: true,
4949
},
5050
{
5151
name: "enabled without tokenizer type - error",
5252
envVars: map[string]string{
53-
constants.EnvKVEventSyncEnabled: "true",
54-
constants.EnvUseRemoteTokenizer: "true",
55-
constants.EnvPrefixCacheTokenizerType: "local",
53+
constants.EnvPrefixCacheKVEventSyncEnabled: "true",
54+
constants.EnvPrefixCacheUseRemoteTokenizer: "true",
55+
constants.EnvPrefixCacheTokenizerType: "local",
5656
},
5757
wantError: true,
5858
},
5959
{
6060
name: "enabled without endpoint - error",
6161
envVars: map[string]string{
62-
constants.EnvKVEventSyncEnabled: "true",
63-
constants.EnvUseRemoteTokenizer: "true",
64-
constants.EnvPrefixCacheTokenizerType: "remote",
65-
constants.EnvRemoteTokenizerEndpoint: "",
62+
constants.EnvPrefixCacheKVEventSyncEnabled: "true",
63+
constants.EnvPrefixCacheUseRemoteTokenizer: "true",
64+
constants.EnvPrefixCacheTokenizerType: "remote",
65+
constants.EnvPrefixCacheRemoteTokenizerEndpoint: "",
6666
},
6767
wantError: true,
6868
},
6969
{
7070
name: "enabled with all config - no error",
7171
envVars: map[string]string{
72-
constants.EnvKVEventSyncEnabled: "true",
73-
constants.EnvUseRemoteTokenizer: "true",
74-
constants.EnvPrefixCacheTokenizerType: "remote",
75-
constants.EnvRemoteTokenizerEndpoint: "http://localhost:8080",
72+
constants.EnvPrefixCacheKVEventSyncEnabled: "true",
73+
constants.EnvPrefixCacheUseRemoteTokenizer: "true",
74+
constants.EnvPrefixCacheTokenizerType: "remote",
75+
constants.EnvPrefixCacheRemoteTokenizerEndpoint: "http://localhost:8080",
7676
},
7777
wantError: false,
7878
},

0 commit comments

Comments
 (0)