diff --git a/operator/Makefile b/operator/Makefile index 2a7dbfa..0d201b4 100644 --- a/operator/Makefile +++ b/operator/Makefile @@ -123,7 +123,8 @@ docker-push: ## Push docker image with the manager. # - have enabled BuildKit. More info: https://docs.docker.com/develop/develop-images/build_enhancements/ # - be able to push the image to your registry (i.e. if you do not set a valid value via IMG=> then the export will fail) # To adequately provide solutions that are compatible with multiple platforms, you should consider using this option. -PLATFORMS ?= linux/arm64,linux/amd64,linux/s390x,linux/ppc64le +#PLATFORMS ?= linux/arm64,linux/amd64,linux/s390x,linux/ppc64le +PLATFORMS ?= linux/arm64,linux/amd64 .PHONY: docker-buildx docker-buildx: ## Build and push docker image for the manager for cross-platform support # copy existing Dockerfile and insert --platform=${BUILDPLATFORM} into Dockerfile.cross, and preserve the original Dockerfile diff --git a/operator/api/v1alpha1/function_types.go b/operator/api/v1alpha1/function_types.go index fd00589..feb88d0 100644 --- a/operator/api/v1alpha1/function_types.go +++ b/operator/api/v1alpha1/function_types.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -25,8 +26,10 @@ import ( // +kubebuilder:validation:Optional type FunctionSpec struct { // Display name of the function + // +kubebuilder:validation:Optional DisplayName string `json:"displayName,omitempty"` // Description of the function + // +kubebuilder:validation:Optional Description string `json:"description,omitempty"` // Package name // +kubebuilder:validation:Required @@ -34,14 +37,20 @@ type FunctionSpec struct { // Module name // +kubebuilder:validation:Required Module string `json:"module"` + // +kubebuilder:validation:Optional + SubscriptionName string `json:"subscriptionName,omitempty"` // List of sources + // +kubebuilder:validation:Optional Sources []SourceSpec `json:"sources,omitempty"` // Request source - RequestSource SourceSpec `json:"requestSource,omitempty"` + // +kubebuilder:validation:Optional + RequestSource *SourceSpec `json:"requestSource,omitempty"` // Sink specifies the sink configuration - Sink SinkSpec `json:"sink,omitempty"` + // +kubebuilder:validation:Optional + Sink *SinkSpec `json:"sink,omitempty"` // Configurations as key-value pairs - Config map[string]string `json:"config,omitempty"` + // +kubebuilder:validation:Optional + Config map[string]v1.JSON `json:"config,omitempty"` } // SourceSpec defines a source or sink specification @@ -49,6 +58,7 @@ type FunctionSpec struct { // +kubebuilder:validation:Optional type SourceSpec struct { // Pulsar source specification + // +kubebuilder:validation:Optional Pulsar *PulsarSourceSpec `json:"pulsar,omitempty"` } @@ -59,9 +69,6 @@ type PulsarSourceSpec struct { // Topic name // +kubebuilder:validation:Required Topic string `json:"topic"` - // Subscription name - // +kubebuilder:validation:Required - SubscriptionName string `json:"subscriptionName"` } // SinkSpec defines a sink specification @@ -69,6 +76,7 @@ type PulsarSourceSpec struct { // +kubebuilder:validation:Optional type SinkSpec struct { // Pulsar sink specification + // +kubebuilder:validation:Optional Pulsar *PulsarSinkSpec `json:"pulsar,omitempty"` } diff --git a/operator/api/v1alpha1/packages_types.go b/operator/api/v1alpha1/packages_types.go index 7ba9208..c51b3c4 100644 --- a/operator/api/v1alpha1/packages_types.go +++ b/operator/api/v1alpha1/packages_types.go @@ -23,27 +23,36 @@ import ( // ConfigItem defines a configuration item for a module type ConfigItem struct { // DisplayName is the human-readable name of the config item - DisplayName string `json:"displayName"` + // +kubebuilder:validation:Optional + DisplayName string `json:"displayName,omitempty"` // Description provides additional information about the config item - Description string `json:"description"` + // +kubebuilder:validation:Optional + Description string `json:"description,omitempty"` // Type specifies the data type of the config item - Type string `json:"type"` + // +kubebuilder:validation:Optional + Type string `json:"type,omitempty"` // Required indicates whether this config item is mandatory - Required bool `json:"required"` + // +kubebuilder:validation:Optional + Required bool `json:"required,omitempty"` } // Module defines a module within a package type Module struct { // DisplayName is the human-readable name of the module - DisplayName string `json:"displayName"` + // +kubebuilder:validation:Optional + DisplayName string `json:"displayName,omitempty"` // Description provides additional information about the module - Description string `json:"description"` + // +kubebuilder:validation:Optional + Description string `json:"description,omitempty"` // SourceSchema defines the input schema for the module + // +kubebuilder:validation:Optional SourceSchema string `json:"sourceSchema,omitempty"` // SinkSchema defines the output schema for the module + // +kubebuilder:validation:Optional SinkSchema string `json:"sinkSchema,omitempty"` // Config is a list of configuration items for the module - Config []ConfigItem `json:"config,omitempty"` + // +kubebuilder:validation:Optional + Config map[string]ConfigItem `json:"config,omitempty"` } // CloudType defines cloud function package configuration @@ -55,17 +64,21 @@ type CloudType struct { // FunctionType defines the function type configuration type FunctionType struct { // Cloud contains cloud function package configuration + // +kubebuilder:validation:Optional Cloud *CloudType `json:"cloud,omitempty"` } // PackageSpec defines the desired state of Package type PackageSpec struct { // DisplayName is the human-readable name of the package - DisplayName string `json:"displayName"` + // +kubebuilder:validation:Optional + DisplayName string `json:"displayName,omitempty"` // Logo is the URL or base64 encoded image for the package logo + // +kubebuilder:validation:Optional Logo string `json:"logo,omitempty"` // Description provides additional information about the package - Description string `json:"description"` + // +kubebuilder:validation:Optional + Description string `json:"description,omitempty"` // FunctionType contains function type configuration FunctionType FunctionType `json:"functionType"` // Modules is a map of module names to their configurations diff --git a/operator/api/v1alpha1/zz_generated.deepcopy.go b/operator/api/v1alpha1/zz_generated.deepcopy.go index 2c7b512..4601feb 100644 --- a/operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/operator/api/v1alpha1/zz_generated.deepcopy.go @@ -21,6 +21,7 @@ limitations under the License. package v1alpha1 import ( + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -123,13 +124,21 @@ func (in *FunctionSpec) DeepCopyInto(out *FunctionSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } - in.RequestSource.DeepCopyInto(&out.RequestSource) - in.Sink.DeepCopyInto(&out.Sink) + if in.RequestSource != nil { + in, out := &in.RequestSource, &out.RequestSource + *out = new(SourceSpec) + (*in).DeepCopyInto(*out) + } + if in.Sink != nil { + in, out := &in.Sink, &out.Sink + *out = new(SinkSpec) + (*in).DeepCopyInto(*out) + } if in.Config != nil { in, out := &in.Config, &out.Config - *out = make(map[string]string, len(*in)) + *out = make(map[string]v1.JSON, len(*in)) for key, val := range *in { - (*out)[key] = val + (*out)[key] = *val.DeepCopy() } } } @@ -184,8 +193,10 @@ func (in *Module) DeepCopyInto(out *Module) { *out = *in if in.Config != nil { in, out := &in.Config, &out.Config - *out = make([]ConfigItem, len(*in)) - copy(*out, *in) + *out = make(map[string]ConfigItem, len(*in)) + for key, val := range *in { + (*out)[key] = val + } } } diff --git a/operator/cmd/main.go b/operator/cmd/main.go index 21bff8a..5a9f064 100644 --- a/operator/cmd/main.go +++ b/operator/cmd/main.go @@ -95,7 +95,6 @@ func main() { opts.BindFlags(flag.CommandLine) flag.Parse() - // Build Config struct (no need to set env) config := controller.Config{ PulsarServiceURL: pulsarServiceUrl, PulsarAuthPlugin: pulsarAuthPlugin, diff --git a/operator/config/crd/bases/fs.functionstream.github.io_functions.yaml b/operator/config/crd/bases/fs.functionstream.github.io_functions.yaml index 06b8deb..8301885 100644 --- a/operator/config/crd/bases/fs.functionstream.github.io_functions.yaml +++ b/operator/config/crd/bases/fs.functionstream.github.io_functions.yaml @@ -41,7 +41,7 @@ spec: properties: config: additionalProperties: - type: string + x-kubernetes-preserve-unknown-fields: true description: Configurations as key-value pairs type: object description: @@ -62,19 +62,15 @@ spec: pulsar: description: Pulsar source specification properties: - subscriptionName: - description: Subscription name - type: string topic: description: Topic name type: string required: - - subscriptionName - topic type: object type: object sink: - description: sink + description: Sink specifies the sink configuration properties: pulsar: description: Pulsar sink specification @@ -94,18 +90,16 @@ spec: pulsar: description: Pulsar source specification properties: - subscriptionName: - description: Subscription name - type: string topic: description: Topic name type: string required: - - subscriptionName - topic type: object type: object type: array + subscriptionName: + type: string required: - module - package diff --git a/operator/config/crd/bases/fs.functionstream.github.io_packages.yaml b/operator/config/crd/bases/fs.functionstream.github.io_packages.yaml index de65e46..5e2238f 100644 --- a/operator/config/crd/bases/fs.functionstream.github.io_packages.yaml +++ b/operator/config/crd/bases/fs.functionstream.github.io_packages.yaml @@ -71,9 +71,7 @@ spec: description: Module defines a module within a package properties: config: - description: Config is a list of configuration items for the - module - items: + additionalProperties: description: ConfigItem defines a configuration item for a module properties: @@ -93,13 +91,10 @@ spec: description: Type specifies the data type of the config item type: string - required: - - description - - displayName - - required - - type type: object - type: array + description: Config is a list of configuration items for the + module + type: object description: description: Description provides additional information about the module @@ -113,15 +108,10 @@ spec: sourceSchema: description: SourceSchema defines the input schema for the module type: string - required: - - description - - displayName type: object description: Modules is a map of module names to their configurations type: object required: - - description - - displayName - functionType - modules type: object diff --git a/operator/config/rbac/role.yaml b/operator/config/rbac/role.yaml index db82767..79cfd5c 100644 --- a/operator/config/rbac/role.yaml +++ b/operator/config/rbac/role.yaml @@ -4,18 +4,6 @@ kind: ClusterRole metadata: name: manager-role rules: -- apiGroups: - - "" - resources: - - configmaps - verbs: - - create - - delete - - get - - list - - patch - - update - - watch - apiGroups: - apps resources: diff --git a/operator/deploy/chart/templates/crd/fs.functionstream.github.io_functions.yaml b/operator/deploy/chart/templates/crd/fs.functionstream.github.io_functions.yaml index 043ce5a..6b9d25c 100755 --- a/operator/deploy/chart/templates/crd/fs.functionstream.github.io_functions.yaml +++ b/operator/deploy/chart/templates/crd/fs.functionstream.github.io_functions.yaml @@ -47,7 +47,7 @@ spec: properties: config: additionalProperties: - type: string + x-kubernetes-preserve-unknown-fields: true description: Configurations as key-value pairs type: object description: @@ -68,19 +68,15 @@ spec: pulsar: description: Pulsar source specification properties: - subscriptionName: - description: Subscription name - type: string topic: description: Topic name type: string required: - - subscriptionName - topic type: object type: object sink: - description: sink + description: Sink specifies the sink configuration properties: pulsar: description: Pulsar sink specification @@ -100,18 +96,16 @@ spec: pulsar: description: Pulsar source specification properties: - subscriptionName: - description: Subscription name - type: string topic: description: Topic name type: string required: - - subscriptionName - topic type: object type: object type: array + subscriptionName: + type: string required: - module - package diff --git a/operator/deploy/chart/templates/crd/fs.functionstream.github.io_packages.yaml b/operator/deploy/chart/templates/crd/fs.functionstream.github.io_packages.yaml index 66e771e..00f60da 100755 --- a/operator/deploy/chart/templates/crd/fs.functionstream.github.io_packages.yaml +++ b/operator/deploy/chart/templates/crd/fs.functionstream.github.io_packages.yaml @@ -77,9 +77,7 @@ spec: description: Module defines a module within a package properties: config: - description: Config is a list of configuration items for the - module - items: + additionalProperties: description: ConfigItem defines a configuration item for a module properties: @@ -99,13 +97,10 @@ spec: description: Type specifies the data type of the config item type: string - required: - - description - - displayName - - required - - type type: object - type: array + description: Config is a list of configuration items for the + module + type: object description: description: Description provides additional information about the module @@ -119,15 +114,10 @@ spec: sourceSchema: description: SourceSchema defines the input schema for the module type: string - required: - - description - - displayName type: object description: Modules is a map of module names to their configurations type: object required: - - description - - displayName - functionType - modules type: object diff --git a/operator/deploy/chart/templates/rbac/role.yaml b/operator/deploy/chart/templates/rbac/role.yaml index 00c573e..3ae0961 100755 --- a/operator/deploy/chart/templates/rbac/role.yaml +++ b/operator/deploy/chart/templates/rbac/role.yaml @@ -7,18 +7,6 @@ metadata: {{- include "chart.labels" . | nindent 4 }} name: operator-manager-role rules: -- apiGroups: - - "" - resources: - - configmaps - verbs: - - create - - delete - - get - - list - - patch - - update - - watch - apiGroups: - apps resources: diff --git a/operator/examples/package.yaml b/operator/examples/package.yaml index ce9d125..27125f5 100644 --- a/operator/examples/package.yaml +++ b/operator/examples/package.yaml @@ -25,4 +25,3 @@ spec: properties: result: type: string - config: [] diff --git a/operator/internal/controller/function_controller.go b/operator/internal/controller/function_controller.go index 257a820..11e47c5 100644 --- a/operator/internal/controller/function_controller.go +++ b/operator/internal/controller/function_controller.go @@ -18,9 +18,9 @@ package controller import ( "context" - "crypto/sha256" - "encoding/hex" "fmt" + "github.com/FunctionStream/function-stream/operator/utils" + "k8s.io/apimachinery/pkg/util/json" "reflect" "gopkg.in/yaml.v3" @@ -58,7 +58,6 @@ type FunctionReconciler struct { // +kubebuilder:rbac:groups=fs.functionstream.github.io,resources=functions/status,verbs=get;update;patch // +kubebuilder:rbac:groups=fs.functionstream.github.io,resources=functions/finalizers,verbs=update // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -71,6 +70,7 @@ type FunctionReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.20.4/pkg/reconcile func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := logf.FromContext(ctx) + log.Info("Reconciling Function", "function", req.NamespacedName) // 1. Get Function var fn fsv1alpha1.Function @@ -95,61 +95,26 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, fmt.Errorf("package %s has no image", fn.Spec.Package) } - // 3. Build ConfigMap data (yaml) - configMapName := fmt.Sprintf("function-%s-config", fn.Name) + // 3. Build config yaml content configYaml, err := buildFunctionConfigYaml(&fn, r.Config) if err != nil { log.Error(err, "Failed to marshal config yaml") return ctrl.Result{}, err } - configMap := &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: configMapName, - Namespace: fn.Namespace, - Labels: map[string]string{ - "function": fn.Name, - }, - }, - Data: map[string]string{ - "config.yaml": configYaml, - }, - } - // Set owner - if err := ctrl.SetControllerReference(&fn, configMap, r.Scheme); err != nil { - return ctrl.Result{}, err - } - - // 4. Create or Update ConfigMap - var existingCM corev1.ConfigMap - cmErr := r.Get(ctx, types.NamespacedName{Name: configMapName, Namespace: fn.Namespace}, &existingCM) - if cmErr == nil { - if !reflect.DeepEqual(existingCM.Data, configMap.Data) { - existingCM.Data = configMap.Data - err = r.Update(ctx, &existingCM) - if err != nil { - return ctrl.Result{}, err - } - } - } else if errors.IsNotFound(cmErr) { - err = r.Create(ctx, configMap) - if err != nil { - return ctrl.Result{}, err - } - } else { - return ctrl.Result{}, cmErr - } - - // 5. Calculate ConfigMap hash - hash := sha256.Sum256([]byte(configYaml)) - hashStr := hex.EncodeToString(hash[:])[:32] - // 6. Build Deployment + // 4. Build Deployment deployName := fmt.Sprintf("function-%s", fn.Name) var replicas int32 = 1 labels := map[string]string{ - "function": fn.Name, - "configmap-hash": hashStr, + "function": fn.Name, } + + // Create init command to write config file + initCommand := fmt.Sprintf(`cat > /config/config.yaml << 'EOF' +%s +EOF +`, configYaml) + deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: deployName, @@ -166,22 +131,33 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c Labels: labels, }, Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{{ + Name: "init-config", + Image: image, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/bin/sh", "-c", initCommand}, + VolumeMounts: []corev1.VolumeMount{{ + Name: "function-config", + MountPath: "/config", + }}, + }}, Containers: []corev1.Container{{ Name: "function", Image: image, ImagePullPolicy: corev1.PullIfNotPresent, VolumeMounts: []corev1.VolumeMount{{ Name: "function-config", - MountPath: "/function/config.yaml", - SubPath: "config.yaml", + MountPath: "/config", + }}, + Env: []corev1.EnvVar{{ + Name: "FS_CONFIG_PATH", + Value: "/config/config.yaml", }}, }}, Volumes: []corev1.Volume{{ Name: "function-config", VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{Name: configMapName}, - }, + EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }}, }, @@ -192,7 +168,7 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } - // 7. Create or Update Deployment + // 5. Create or Update Deployment var existingDeploy appsv1.Deployment deployErr := r.Get(ctx, types.NamespacedName{Name: deployName, Namespace: fn.Namespace}, &existingDeploy) if deployErr == nil { @@ -203,23 +179,23 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c existingDeploy.Labels = deployment.Labels err = r.Update(ctx, &existingDeploy) if err != nil { - return HandleReconcileError(log, err, "Conflict when updating Deployment, will retry automatically") + return utils.HandleReconcileError(log, err, "Conflict when updating Deployment, will retry automatically") } } } else if errors.IsNotFound(deployErr) { err = r.Create(ctx, deployment) if err != nil { - return HandleReconcileError(log, err, "Conflict when creating Deployment, will retry automatically") + return utils.HandleReconcileError(log, err, "Conflict when creating Deployment, will retry automatically") } } else { return ctrl.Result{}, deployErr } - // 8. Update Function Status from Deployment Status + // 7. Update Function Status from Deployment Status if err := r.Get(ctx, types.NamespacedName{Name: deployName, Namespace: fn.Namespace}, &existingDeploy); err == nil { fn.Status = convertDeploymentStatusToFunctionStatus(&existingDeploy.Status) if err := r.Status().Update(ctx, &fn); err != nil { - return HandleReconcileError(log, err, "Conflict when updating Function status, will retry automatically") + return utils.HandleReconcileError(log, err, "Conflict when updating Function status, will retry automatically") } } @@ -240,17 +216,30 @@ func buildFunctionConfigYaml(fn *fsv1alpha1.Function, operatorCfg Config) (strin if len(fn.Spec.Sources) > 0 { cfg["sources"] = fn.Spec.Sources } - if fn.Spec.RequestSource.Pulsar != nil { + if fn.Spec.RequestSource != nil { cfg["requestSource"] = fn.Spec.RequestSource } - if fn.Spec.Sink.Pulsar != nil { + if fn.Spec.SubscriptionName != "" { + cfg["subscriptionName"] = fn.Spec.SubscriptionName + } else { + cfg["subscriptionName"] = fmt.Sprintf("fs-%s", fn.Name) + } + if fn.Spec.Sink != nil { cfg["sink"] = fn.Spec.Sink } if fn.Spec.Module != "" { cfg["module"] = fn.Spec.Module } if fn.Spec.Config != nil { - cfg["config"] = fn.Spec.Config + configMap := make(map[string]interface{}) + for k, v := range fn.Spec.Config { + var r interface{} + if err := json.Unmarshal(v.Raw, &r); err != nil { + return "", fmt.Errorf("failed to unmarshal config value for key %s: %w", k, err) + } + configMap[k] = r + } + cfg["config"] = configMap } if fn.Spec.Description != "" { cfg["description"] = fn.Spec.Description @@ -290,7 +279,6 @@ func (r *FunctionReconciler) SetupWithManager(mgr ctrl.Manager) error { functionLabelPredicate := predicate.NewPredicateFuncs(hasFunctionLabel) return ctrl.NewControllerManagedBy(mgr). For(&fsv1alpha1.Function{}). - Owns(&corev1.ConfigMap{}, builder.WithPredicates(functionLabelPredicate)). Owns(&appsv1.Deployment{}, builder.WithPredicates(functionLabelPredicate)). Named("function"). Complete(r) diff --git a/operator/internal/controller/function_controller_test.go b/operator/internal/controller/function_controller_test.go index 2f58cfc..8134871 100644 --- a/operator/internal/controller/function_controller_test.go +++ b/operator/internal/controller/function_controller_test.go @@ -18,8 +18,6 @@ package controller import ( "context" - "crypto/sha256" - "encoding/hex" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -30,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fsv1alpha1 "github.com/FunctionStream/function-stream/operator/api/v1alpha1" - "gopkg.in/yaml.v3" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -71,7 +68,7 @@ var _ = Describe("Function Controller", func() { By("Cleanup the specific resource instance Function") Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) }) - It("should successfully reconcile the resource and create ConfigMap, Deployment, and update Status", func() { + It("should successfully reconcile the resource and create Deployment with init container, and update Status", func() { By("Reconciling the created resource") controllerReconciler := &FunctionReconciler{ Client: k8sClient, @@ -107,8 +104,8 @@ var _ = Describe("Function Controller", func() { patch := client.MergeFrom(function.DeepCopy()) function.Spec.Package = "test-pkg" function.Spec.Module = "mod" - function.Spec.Sink = fsv1alpha1.SinkSpec{Pulsar: &fsv1alpha1.PulsarSinkSpec{Topic: "out"}} - function.Spec.RequestSource = fsv1alpha1.SourceSpec{Pulsar: &fsv1alpha1.PulsarSourceSpec{Topic: "in", SubscriptionName: "sub"}} + function.Spec.Sink = &fsv1alpha1.SinkSpec{Pulsar: &fsv1alpha1.PulsarSinkSpec{Topic: "out"}} + function.Spec.RequestSource = &fsv1alpha1.SourceSpec{Pulsar: &fsv1alpha1.PulsarSourceSpec{Topic: "in"}} Expect(k8sClient.Patch(ctx, function, patch)).To(Succeed()) _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ @@ -116,27 +113,48 @@ var _ = Describe("Function Controller", func() { }) Expect(err).NotTo(HaveOccurred()) - // Check ConfigMap - cmName := "function-" + typeNamespacedName.Name + "-config" - cm := &corev1.ConfigMap{} - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmName, Namespace: typeNamespacedName.Namespace}, cm)).To(Succeed()) - Expect(cm.Data).To(HaveKey("config.yaml")) - - // Assert pulsar config in config.yaml - var configYaml map[string]interface{} - Expect(yaml.Unmarshal([]byte(cm.Data["config.yaml"]), &configYaml)).To(Succeed()) - pulsarCfg, ok := configYaml["pulsar"].(map[string]interface{}) - Expect(ok).To(BeTrue()) - Expect(pulsarCfg["serviceUrl"]).To(Equal("pulsar://test-broker:6650")) - Expect(pulsarCfg["authPlugin"]).To(Equal("org.apache.pulsar.client.impl.auth.AuthenticationToken")) - Expect(pulsarCfg["authParams"]).To(Equal("token:my-token")) - // Check Deployment deployName := "function-" + typeNamespacedName.Name deploy := &appsv1.Deployment{} Expect(k8sClient.Get(ctx, types.NamespacedName{Name: deployName, Namespace: typeNamespacedName.Namespace}, deploy)).To(Succeed()) - Expect(deploy.Spec.Template.Spec.Containers[0].Image).To(Equal("busybox:latest")) - Expect(deploy.Spec.Template.Spec.Volumes[0].ConfigMap.Name).To(Equal(cmName)) + + // Verify init container exists and has correct configuration + Expect(deploy.Spec.Template.Spec.InitContainers).To(HaveLen(1)) + initContainer := deploy.Spec.Template.Spec.InitContainers[0] + Expect(initContainer.Name).To(Equal("init-config")) + Expect(initContainer.Image).To(Equal("busybox:latest")) + Expect(initContainer.Command).To(HaveLen(3)) + Expect(initContainer.Command[0]).To(Equal("/bin/sh")) + Expect(initContainer.Command[1]).To(Equal("-c")) + + // Verify the init command contains config.yaml content + initCommand := initContainer.Command[2] + Expect(initCommand).To(ContainSubstring("cat > /config/config.yaml")) + + // Verify main container configuration + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + mainContainer := deploy.Spec.Template.Spec.Containers[0] + Expect(mainContainer.Name).To(Equal("function")) + Expect(mainContainer.Image).To(Equal("busybox:latest")) + + // Verify volume mounts + Expect(mainContainer.VolumeMounts).To(HaveLen(1)) + Expect(mainContainer.VolumeMounts[0].Name).To(Equal("function-config")) + Expect(mainContainer.VolumeMounts[0].MountPath).To(Equal("/config")) + + // Verify environment variable + Expect(mainContainer.Env).To(HaveLen(1)) + Expect(mainContainer.Env[0].Name).To(Equal("FS_CONFIG_PATH")) + Expect(mainContainer.Env[0].Value).To(Equal("/config/config.yaml")) + + // Verify volumes + Expect(deploy.Spec.Template.Spec.Volumes).To(HaveLen(1)) + Expect(deploy.Spec.Template.Spec.Volumes[0].Name).To(Equal("function-config")) + Expect(deploy.Spec.Template.Spec.Volumes[0].EmptyDir).NotTo(BeNil()) + + // Verify labels + Expect(deploy.Labels).To(HaveKey("function")) + Expect(deploy.Labels["function"]).To(Equal(typeNamespacedName.Name)) // Simulate Deployment status update patchDeploy := client.MergeFrom(deploy.DeepCopy()) @@ -161,23 +179,25 @@ var _ = Describe("Function Controller", func() { Expect(fn.Status.UpdatedReplicas).To(Equal(int32(1))) Expect(fn.Status.ObservedGeneration).To(Equal(int64(2))) - // Simulate ConfigMap change and check Deployment hash label update - patchCM := client.MergeFrom(cm.DeepCopy()) - cm.Data["config.yaml"] = cm.Data["config.yaml"] + "#changed" - Expect(k8sClient.Patch(ctx, cm, patchCM)).To(Succeed()) - // Force re-get to ensure the content has changed - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmName, Namespace: typeNamespacedName.Namespace}, cm)).To(Succeed()) + // Test deployment update when function spec changes + // Update function spec to trigger deployment update + patchFn := client.MergeFrom(fn.DeepCopy()) + fn.Spec.Description = "Updated description" + Expect(k8sClient.Patch(ctx, fn, patchFn)).To(Succeed()) + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ NamespacedName: typeNamespacedName, }) Expect(err).NotTo(HaveOccurred()) + + // Verify deployment was updated Expect(k8sClient.Get(ctx, types.NamespacedName{Name: deployName, Namespace: typeNamespacedName.Namespace}, deploy)).To(Succeed()) - // hash label should change - Expect(deploy.Labels).To(HaveKey("configmap-hash")) + // The deployment should still exist and be updated + Expect(deploy).NotTo(BeNil()) }) - It("should only reconcile when ConfigMap has 'function' label", func() { - By("setting up a Function and its labeled ConfigMap") + It("should only reconcile when Deployment has 'function' label", func() { + By("setting up a Function and its labeled Deployment") controllerReconciler := &FunctionReconciler{ Client: k8sClient, Scheme: k8sClient.Scheme(), @@ -210,67 +230,60 @@ var _ = Describe("Function Controller", func() { Namespace: "default", }, Spec: fsv1alpha1.FunctionSpec{ - Package: "test-pkg-label", - Module: "mod", - Sink: fsv1alpha1.SinkSpec{Pulsar: &fsv1alpha1.PulsarSinkSpec{Topic: "out"}}, - RequestSource: fsv1alpha1.SourceSpec{Pulsar: &fsv1alpha1.PulsarSourceSpec{Topic: "in", SubscriptionName: "sub"}}, + Package: "test-pkg-label", + Module: "mod", + SubscriptionName: "sub", + Sink: &fsv1alpha1.SinkSpec{Pulsar: &fsv1alpha1.PulsarSinkSpec{Topic: "out"}}, + RequestSource: &fsv1alpha1.SourceSpec{Pulsar: &fsv1alpha1.PulsarSourceSpec{Topic: "in"}}, }, } Expect(k8sClient.Create(ctx, fn)).To(Succeed()) - // Initial reconcile to create ConfigMap and Deployment + // Initial reconcile to create Deployment _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ NamespacedName: types.NamespacedName{Name: fn.Name, Namespace: fn.Namespace}, }) Expect(err).NotTo(HaveOccurred()) - cmName := "function-" + fn.Name + "-config" - cm := &corev1.ConfigMap{} - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmName, Namespace: fn.Namespace}, cm)).To(Succeed()) - oldHash := sha256sum(cm.Data["config.yaml"]) - - // Patch labeled ConfigMap, should NOT trigger reconcile or hash change - patchCM := client.MergeFrom(cm.DeepCopy()) - cm.Data["config.yaml"] = cm.Data["config.yaml"] + "#changed" - Expect(k8sClient.Patch(ctx, cm, patchCM)).To(Succeed()) - // Force re-get to ensure the content has changed - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cmName, Namespace: fn.Namespace}, cm)).To(Succeed()) - _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ - NamespacedName: types.NamespacedName{Name: fn.Name, Namespace: fn.Namespace}, - }) - Expect(err).NotTo(HaveOccurred()) + deployName := "function-" + fn.Name deploy := &appsv1.Deployment{} - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "function-" + fn.Name, Namespace: fn.Namespace}, deploy)).To(Succeed()) - newHash := deploy.Labels["configmap-hash"] - Expect(newHash).To(Equal(oldHash)) + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: deployName, Namespace: fn.Namespace}, deploy)).To(Succeed()) - // Create a ConfigMap without 'function' label - unlabeledCM := &corev1.ConfigMap{ + // Create a Deployment without 'function' label + unlabeledDeploy := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: "unlabeled-cm", + Name: "unlabeled-deploy", Namespace: fn.Namespace, + Labels: map[string]string{"app": "test"}, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &[]int32{1}[0], + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "test"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": "test"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "test", + Image: "busybox:latest", + }}, + }, + }, }, - Data: map[string]string{"foo": "bar"}, } - Expect(k8sClient.Create(ctx, unlabeledCM)).To(Succeed()) - // Patch unlabeled ConfigMap, should NOT trigger reconcile or hash change - patchUnlabeled := client.MergeFrom(unlabeledCM.DeepCopy()) - unlabeledCM.Data["foo"] = "baz" - Expect(k8sClient.Patch(ctx, unlabeledCM, patchUnlabeled)).To(Succeed()) + Expect(k8sClient.Create(ctx, unlabeledDeploy)).To(Succeed()) + // Manually call Reconcile to simulate the event, but the hash should not change _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ NamespacedName: types.NamespacedName{Name: fn.Name, Namespace: fn.Namespace}, }) Expect(err).NotTo(HaveOccurred()) + // Get Deployment again, the hash should remain unchanged - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "function-" + fn.Name, Namespace: fn.Namespace}, deploy)).To(Succeed()) - Expect(deploy.Labels["configmap-hash"]).To(Equal(newHash)) + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: deployName, Namespace: fn.Namespace}, deploy)).To(Succeed()) }) }) }) - -// Utility function: first 32 characters of sha256 -func sha256sum(s string) string { - hash := sha256.Sum256([]byte(s)) - return hex.EncodeToString(hash[:])[:32] -} diff --git a/operator/internal/controller/packages_controller_test.go b/operator/internal/controller/packages_controller_test.go index e530d25..a9890e9 100644 --- a/operator/internal/controller/packages_controller_test.go +++ b/operator/internal/controller/packages_controller_test.go @@ -55,7 +55,7 @@ var _ = Describe("Package Controller", func() { DisplayName: "test", Description: "desc", FunctionType: fsv1alpha1.FunctionType{}, - Modules: map[string]fsv1alpha1.Module{}, + Modules: map[string]fsv1alpha1.Module{"mod": {DisplayName: "mod", Description: "desc"}}, }, } Expect(k8sClient.Create(ctx, resource)).To(Succeed()) diff --git a/operator/internal/controller/util.go b/operator/utils/util.go similarity index 96% rename from operator/internal/controller/util.go rename to operator/utils/util.go index 7d9a55b..67fd165 100644 --- a/operator/internal/controller/util.go +++ b/operator/utils/util.go @@ -1,4 +1,4 @@ -package controller +package utils import ( "github.com/go-logr/logr"