@@ -20,6 +20,7 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"reflect"
23
+ "sync"
23
24
24
25
corev1 "k8s.io/api/core/v1"
25
26
apimeta "k8s.io/apimachinery/pkg/api/meta"
@@ -39,6 +40,7 @@ import (
39
40
"sigs.k8s.io/controller-runtime/pkg/handler"
40
41
"sigs.k8s.io/controller-runtime/pkg/log"
41
42
"sigs.k8s.io/controller-runtime/pkg/predicate"
43
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
42
44
lws "sigs.k8s.io/lws/api/leaderworkerset/v1"
43
45
applyconfigurationv1 "sigs.k8s.io/lws/client-go/applyconfiguration/leaderworkerset/v1"
44
46
@@ -52,8 +54,10 @@ import (
52
54
// ServiceReconciler reconciles a Service object
53
55
type ServiceReconciler struct {
54
56
client.Client
55
- Scheme * runtime.Scheme
56
- Record record.EventRecorder
57
+ Scheme * runtime.Scheme
58
+ Record record.EventRecorder
59
+ GlobalConfigMutex sync.RWMutex
60
+ GlobalConfig * helper.GlobalConfig
57
61
}
58
62
59
63
func NewServiceReconciler (client client.Client , scheme * runtime.Scheme , record record.EventRecorder ) * ServiceReconciler {
@@ -86,24 +90,21 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
86
90
87
91
logger .V (10 ).Info ("reconcile Service" , "Service" , klog .KObj (service ))
88
92
89
- cm := & corev1.ConfigMap {}
90
- if err := r .Get (ctx , types.NamespacedName {Name : "llmaz-global-config" , Namespace : "llmaz-system" }, cm ); err != nil {
91
- if client .IgnoreNotFound (err ) != nil {
92
- return ctrl.Result {}, fmt .Errorf ("failed to get llmaz-global-config configmap: %w" , err )
93
- }
94
- }
95
- configs , err := helper .ParseGlobalConfigmap (cm )
96
- if err != nil {
97
- return ctrl.Result {}, fmt .Errorf ("failed to parse global configurations: %w" , err )
93
+ r .GlobalConfigMutex .RLock ()
94
+ config := r .GlobalConfig
95
+ r .GlobalConfigMutex .RUnlock ()
96
+
97
+ if config == nil {
98
+ return ctrl.Result {}, fmt .Errorf ("globel configs not init" )
98
99
}
99
100
100
101
// Set the global configurations to the service.
101
- if configs .SchedulerName != "" {
102
+ if config .SchedulerName != "" {
102
103
if service .Spec .WorkloadTemplate .LeaderTemplate != nil && service .Spec .WorkloadTemplate .LeaderTemplate .Spec .SchedulerName == "" {
103
- service .Spec .WorkloadTemplate .LeaderTemplate .Spec .SchedulerName = configs .SchedulerName
104
+ service .Spec .WorkloadTemplate .LeaderTemplate .Spec .SchedulerName = config .SchedulerName
104
105
}
105
106
if service .Spec .WorkloadTemplate .WorkerTemplate .Spec .SchedulerName == "" {
106
- service .Spec .WorkloadTemplate .WorkerTemplate .Spec .SchedulerName = configs .SchedulerName
107
+ service .Spec .WorkloadTemplate .WorkerTemplate .Spec .SchedulerName = config .SchedulerName
107
108
}
108
109
109
110
if err := r .Client .Update (ctx , service ); err != nil {
@@ -146,6 +147,9 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
146
147
147
148
// SetupWithManager sets up the controller with the Manager.
148
149
func (r * ServiceReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
150
+ if err := r .InitializeGlobalConfig (context .Background ()); err != nil {
151
+ return fmt .Errorf ("failed to initialize global config: %w" , err )
152
+ }
149
153
return ctrl .NewControllerManagedBy (mgr ).
150
154
For (& inferenceapi.Service {}).
151
155
Watches (& lws.LeaderWorkerSet {}, handler .EnqueueRequestForOwner (r .Scheme , r .RESTMapper (), & inferenceapi.Service {}, handler .OnlyControllerOwner ()),
@@ -156,9 +160,54 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
156
160
return ! reflect .DeepEqual (oldBar .Status , newBar .Status )
157
161
},
158
162
})).
163
+ Watches (& corev1.ConfigMap {}, handler .EnqueueRequestsFromMapFunc (r .updateGlobalConfig ),
164
+ builder .WithPredicates (predicate.Funcs {
165
+ UpdateFunc : func (e event.UpdateEvent ) bool {
166
+ cm := e .ObjectOld .(* corev1.ConfigMap )
167
+ return cm .Name == helper .GlobalConfigMapName && cm .Namespace == helper .GlobalConfigMapNamespace
168
+ },
169
+ CreateFunc : func (e event.CreateEvent ) bool {
170
+ cm := e .Object .(* corev1.ConfigMap )
171
+ return cm .Name == helper .GlobalConfigMapName && cm .Namespace == helper .GlobalConfigMapNamespace
172
+ },
173
+ })).
159
174
Complete (r )
160
175
}
161
176
177
+ func (r * ServiceReconciler ) InitializeGlobalConfig (ctx context.Context ) error {
178
+ cm := & corev1.ConfigMap {}
179
+ key := types.NamespacedName {Name : helper .GlobalConfigMapName , Namespace : helper .GlobalConfigMapNamespace }
180
+ if err := r .Get (ctx , key , cm ); err != nil {
181
+ return fmt .Errorf ("failed to get initial global config: %w" , err )
182
+ }
183
+
184
+ config , err := helper .ParseGlobalConfigmap (cm )
185
+ if err != nil {
186
+ return fmt .Errorf ("failed to parse initial global config: %w" , err )
187
+ }
188
+
189
+ r .GlobalConfigMutex .Lock ()
190
+ defer r .GlobalConfigMutex .Unlock ()
191
+ r .GlobalConfig = config
192
+ return nil
193
+ }
194
+
195
+ func (r * ServiceReconciler ) updateGlobalConfig (ctx context.Context , obj client.Object ) []reconcile.Request {
196
+ logger := log .FromContext (ctx )
197
+ cm := obj .(* corev1.ConfigMap )
198
+
199
+ newConfig , err := helper .ParseGlobalConfigmap (cm )
200
+ if err != nil {
201
+ logger .Error (err , "failed to parse global config" )
202
+ return nil
203
+ }
204
+ r .GlobalConfigMutex .Lock ()
205
+ defer r .GlobalConfigMutex .Unlock ()
206
+ r .GlobalConfig = newConfig
207
+ logger .Info ("global config updated" , "config" , newConfig )
208
+ return nil
209
+ }
210
+
162
211
func buildWorkloadApplyConfiguration (service * inferenceapi.Service , models []* coreapi.OpenModel ) * applyconfigurationv1.LeaderWorkerSetApplyConfiguration {
163
212
workload := applyconfigurationv1 .LeaderWorkerSet (service .Name , service .Namespace )
164
213
0 commit comments