Skip to content

Commit d7f70e4

Browse files
committed
feature: use watch instead of client get
Signed-off-by: googs1025 <[email protected]>
1 parent dbdb8c9 commit d7f70e4

File tree

3 files changed

+53
-15
lines changed

3 files changed

+53
-15
lines changed

pkg/controller/inference/service_controller.go

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"reflect"
23+
"sync"
2324

2425
corev1 "k8s.io/api/core/v1"
2526
apimeta "k8s.io/apimachinery/pkg/api/meta"
@@ -39,6 +40,7 @@ import (
3940
"sigs.k8s.io/controller-runtime/pkg/handler"
4041
"sigs.k8s.io/controller-runtime/pkg/log"
4142
"sigs.k8s.io/controller-runtime/pkg/predicate"
43+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4244
lws "sigs.k8s.io/lws/api/leaderworkerset/v1"
4345
applyconfigurationv1 "sigs.k8s.io/lws/client-go/applyconfiguration/leaderworkerset/v1"
4446

@@ -52,8 +54,10 @@ import (
5254
// ServiceReconciler reconciles a Service object
5355
type ServiceReconciler struct {
5456
client.Client
55-
Scheme *runtime.Scheme
56-
Record record.EventRecorder
57+
Scheme *runtime.Scheme
58+
Record record.EventRecorder
59+
GlobalConfigsMutex sync.RWMutex
60+
GlobalConfigs *helper.GlobalConfigs
5761
}
5862

5963
func NewServiceReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *ServiceReconciler {
@@ -86,15 +90,12 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
8690

8791
logger.V(10).Info("reconcile Service", "Service", klog.KObj(service))
8892

89-
cm := &corev1.ConfigMap{}
90-
if err := r.Get(ctx, types.NamespacedName{Name: "llmaz-global-config", Namespace: "llmaz-system"}, cm); err != nil {
91-
if client.IgnoreNotFound(err) != nil {
92-
return ctrl.Result{}, fmt.Errorf("failed to get llmaz-global-config configmap: %w", err)
93-
}
94-
}
95-
configs, err := helper.ParseGlobalConfigmap(cm)
96-
if err != nil {
97-
return ctrl.Result{}, fmt.Errorf("failed to parse global configurations: %w", err)
93+
r.GlobalConfigsMutex.RLock()
94+
configs := r.GlobalConfigs
95+
r.GlobalConfigsMutex.RUnlock()
96+
97+
if configs == nil {
98+
return ctrl.Result{}, fmt.Errorf("globel configs not init")
9899
}
99100

100101
// Set the global configurations to the service.
@@ -160,9 +161,39 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
160161
return !reflect.DeepEqual(oldBar.Status, newBar.Status)
161162
},
162163
})).
164+
Watches(&corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.updateGlobalConfig),
165+
builder.WithPredicates(predicate.Funcs{
166+
UpdateFunc: func(e event.UpdateEvent) bool {
167+
cm := e.ObjectOld.(*corev1.ConfigMap)
168+
return cm.Name == helper.GlobalConfigMapName && cm.Namespace == helper.GlobalConfigMapNamespace
169+
},
170+
CreateFunc: func(e event.CreateEvent) bool {
171+
cm := e.Object.(*corev1.ConfigMap)
172+
return cm.Name == helper.GlobalConfigMapName && cm.Namespace == helper.GlobalConfigMapNamespace
173+
},
174+
})).
163175
Complete(r)
164176
}
165177

178+
func (r *ServiceReconciler) updateGlobalConfig(ctx context.Context, obj client.Object) []reconcile.Request {
179+
logger := log.FromContext(ctx)
180+
cm, ok := obj.(*corev1.ConfigMap)
181+
if !ok {
182+
return nil
183+
}
184+
185+
newConfig, err := helper.ParseGlobalConfigmap(cm)
186+
if err != nil {
187+
logger.Error(err, "failed to parse global config")
188+
return nil
189+
}
190+
r.GlobalConfigsMutex.Lock()
191+
defer r.GlobalConfigsMutex.Unlock()
192+
r.GlobalConfigs = newConfig
193+
logger.Info("global config updated", "config", newConfig)
194+
return nil
195+
}
196+
166197
func buildWorkloadApplyConfiguration(service *inferenceapi.Service, models []*coreapi.OpenModel, configs *helper.GlobalConfigs) (*applyconfigurationv1.LeaderWorkerSetApplyConfiguration, error) {
167198
workload := applyconfigurationv1.LeaderWorkerSet(service.Name, service.Namespace)
168199

@@ -216,17 +247,17 @@ func buildWorkloadApplyConfiguration(service *inferenceapi.Service, models []*co
216247
return workload, nil
217248
}
218249

219-
func injectModelProperties(template *applyconfigurationv1.LeaderWorkerTemplateApplyConfiguration, models []*coreapi.OpenModel, service *inferenceapi.Service, configs *helper.GlobalConfigs) {
250+
func injectModelProperties(template *applyconfigurationv1.LeaderWorkerTemplateApplyConfiguration, models []*coreapi.OpenModel, service *inferenceapi.Service, config *helper.GlobalConfigs) {
220251
isMultiNodesInference := template.LeaderTemplate != nil
221252

222253
for i, model := range models {
223254
source := modelSource.NewModelSourceProvider(model)
224255
// Skip model-loader initContainer if llmaz.io/skip-model-loader annotation is set.
225256
if !helper.SkipModelLoader(service) {
226257
if isMultiNodesInference {
227-
source.InjectModelLoader(template.LeaderTemplate, i, configs.InitContainerImage)
258+
source.InjectModelLoader(template.LeaderTemplate, i, config.InitContainerImage)
228259
}
229-
source.InjectModelLoader(template.WorkerTemplate, i, configs.InitContainerImage)
260+
source.InjectModelLoader(template.WorkerTemplate, i, config.InitContainerImage)
230261
} else {
231262
if isMultiNodesInference {
232263
source.InjectModelEnvVars(template.LeaderTemplate)

pkg/controller_helper/configmap.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ import (
2323
corev1 "k8s.io/api/core/v1"
2424
)
2525

26+
const (
27+
GlobalConfigMapName = "llmaz-global-config"
28+
GlobalConfigMapNamespace = "llmaz-system"
29+
)
30+
31+
// GlobalConfigs defines the global configuration parameters used across services.
32+
// These configurations are typically provided via a ConfigMap named "llmaz-global-config"
2633
type GlobalConfigs struct {
2734
SchedulerName string `yaml:"scheduler-name"`
2835
InitContainerImage string `yaml:"init-container-image"`

pkg/controller_helper/configmap_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"github.com/stretchr/testify/require"
2424
)
2525

26-
func TestGlobalConfigs_validate(t *testing.T) {
26+
func TestGlobalConfig_validate(t *testing.T) {
2727
tests := []struct {
2828
name string
2929
config *GlobalConfigs

0 commit comments

Comments
 (0)