@@ -19,9 +19,10 @@ package controller
19
19
import (
20
20
"context"
21
21
"fmt"
22
+ "reflect"
23
+
22
24
"github.com/FunctionStream/function-stream/operator/utils"
23
25
"k8s.io/apimachinery/pkg/util/json"
24
- "reflect"
25
26
26
27
"gopkg.in/yaml.v3"
27
28
appsv1 "k8s.io/api/apps/v1"
@@ -33,8 +34,10 @@ import (
33
34
ctrl "sigs.k8s.io/controller-runtime"
34
35
"sigs.k8s.io/controller-runtime/pkg/builder"
35
36
"sigs.k8s.io/controller-runtime/pkg/client"
37
+ "sigs.k8s.io/controller-runtime/pkg/handler"
36
38
logf "sigs.k8s.io/controller-runtime/pkg/log"
37
39
"sigs.k8s.io/controller-runtime/pkg/predicate"
40
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
38
41
39
42
fsv1alpha1 "github.com/FunctionStream/function-stream/operator/api/v1alpha1"
40
43
)
@@ -81,7 +84,17 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
81
84
return ctrl.Result {}, err
82
85
}
83
86
84
- // 2. Get Package
87
+ // 2. Ensure Function has package label
88
+ if fn .Labels == nil {
89
+ fn .Labels = make (map [string ]string )
90
+ }
91
+ labelUpdated := false
92
+ if fn .Labels ["package" ] != fn .Spec .Package {
93
+ fn .Labels ["package" ] = fn .Spec .Package
94
+ labelUpdated = true
95
+ }
96
+
97
+ // 3. Get Package
85
98
var pkg fsv1alpha1.Package
86
99
if err := r .Get (ctx , types.NamespacedName {Name : fn .Spec .Package , Namespace : req .Namespace }, & pkg ); err != nil {
87
100
log .Error (err , "Failed to get Package" , "package" , fn .Spec .Package )
@@ -95,14 +108,14 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
95
108
return ctrl.Result {}, fmt .Errorf ("package %s has no image" , fn .Spec .Package )
96
109
}
97
110
98
- // 3 . Build config yaml content
111
+ // 4 . Build config yaml content
99
112
configYaml , err := buildFunctionConfigYaml (& fn , r .Config )
100
113
if err != nil {
101
114
log .Error (err , "Failed to marshal config yaml" )
102
115
return ctrl.Result {}, err
103
116
}
104
117
105
- // 4 . Build Deployment
118
+ // 5 . Build Deployment
106
119
deployName := fmt .Sprintf ("function-%s" , fn .Name )
107
120
var replicas int32 = 1
108
121
labels := map [string ]string {
@@ -132,19 +145,17 @@ EOF
132
145
},
133
146
Spec : corev1.PodSpec {
134
147
InitContainers : []corev1.Container {{
135
- Name : "init-config" ,
136
- Image : image ,
137
- ImagePullPolicy : corev1 .PullIfNotPresent ,
138
- Command : []string {"/bin/sh" , "-c" , initCommand },
148
+ Name : "init-config" ,
149
+ Image : image ,
150
+ Command : []string {"/bin/sh" , "-c" , initCommand },
139
151
VolumeMounts : []corev1.VolumeMount {{
140
152
Name : "function-config" ,
141
153
MountPath : "/config" ,
142
154
}},
143
155
}},
144
156
Containers : []corev1.Container {{
145
- Name : "function" ,
146
- Image : image ,
147
- ImagePullPolicy : corev1 .PullIfNotPresent ,
157
+ Name : "function" ,
158
+ Image : image ,
148
159
VolumeMounts : []corev1.VolumeMount {{
149
160
Name : "function-config" ,
150
161
MountPath : "/config" ,
168
179
return ctrl.Result {}, err
169
180
}
170
181
171
- // 5 . Create or Update Deployment
182
+ // 6 . Create or Update Deployment
172
183
var existingDeploy appsv1.Deployment
173
184
deployErr := r .Get (ctx , types.NamespacedName {Name : deployName , Namespace : fn .Namespace }, & existingDeploy )
174
185
if deployErr == nil {
199
210
}
200
211
}
201
212
213
+ // 8. Update Function labels if needed
214
+ if labelUpdated {
215
+ // Re-fetch the Function to ensure we have the latest version
216
+ var latestFn fsv1alpha1.Function
217
+ if err := r .Get (ctx , req .NamespacedName , & latestFn ); err != nil {
218
+ log .Error (err , "Failed to get latest Function for label update" )
219
+ return ctrl.Result {}, err
220
+ }
221
+ // Apply our label changes to the latest version
222
+ if latestFn .Labels == nil {
223
+ latestFn .Labels = make (map [string ]string )
224
+ }
225
+ latestFn .Labels ["package" ] = fn .Spec .Package
226
+ if err := r .Update (ctx , & latestFn ); err != nil {
227
+ return utils .HandleReconcileError (log , err , "Conflict when updating Function labels, will retry automatically" )
228
+ }
229
+ }
230
+
202
231
return ctrl.Result {}, nil
203
232
}
204
233
@@ -280,6 +309,38 @@ func (r *FunctionReconciler) SetupWithManager(mgr ctrl.Manager) error {
280
309
return ctrl .NewControllerManagedBy (mgr ).
281
310
For (& fsv1alpha1.Function {}).
282
311
Owns (& appsv1.Deployment {}, builder .WithPredicates (functionLabelPredicate )).
312
+ Watches (
313
+ & fsv1alpha1.Package {},
314
+ handler .EnqueueRequestsFromMapFunc (r .mapPackageToFunctions ),
315
+ ).
283
316
Named ("function" ).
284
317
Complete (r )
285
318
}
319
+
320
+ // mapPackageToFunctions maps Package changes to related Functions
321
+ func (r * FunctionReconciler ) mapPackageToFunctions (ctx context.Context , obj client.Object ) []reconcile.Request {
322
+ packageObj , ok := obj .(* fsv1alpha1.Package )
323
+ if ! ok {
324
+ return nil
325
+ }
326
+
327
+ // Get Functions that reference this Package using label selector
328
+ var functions fsv1alpha1.FunctionList
329
+ if err := r .List (ctx , & functions ,
330
+ client .InNamespace (packageObj .Namespace ),
331
+ client .MatchingLabels (map [string ]string {"package" : packageObj .Name })); err != nil {
332
+ return nil
333
+ }
334
+
335
+ var requests []reconcile.Request
336
+ for _ , function := range functions .Items {
337
+ requests = append (requests , reconcile.Request {
338
+ NamespacedName : types.NamespacedName {
339
+ Name : function .Name ,
340
+ Namespace : function .Namespace ,
341
+ },
342
+ })
343
+ }
344
+
345
+ return requests
346
+ }
0 commit comments