From ab5f19391088701d6bc5d6d814143d46b9c67656 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Sat, 9 Aug 2025 18:05:57 +0000 Subject: [PATCH 1/8] Feat: Support role upgrade sequences in stormservice Signed-off-by: Omer Aplatony --- api/orchestration/v1alpha1/roleset_types.go | 7 ++ .../v1alpha1/zz_generated.deepcopy.go | 5 + .../orchestration.aibrix.ai_rolesets.yaml | 4 + ...orchestration.aibrix.ai_stormservices.yaml | 4 + pkg/controller/roleset/rolling.go | 9 +- pkg/controller/roleset/utils.go | 17 +++ pkg/controller/roleset/utils_test.go | 105 ++++++++++++++++++ 7 files changed, 148 insertions(+), 3 deletions(-) diff --git a/api/orchestration/v1alpha1/roleset_types.go b/api/orchestration/v1alpha1/roleset_types.go index daff520a6..2937c4835 100644 --- a/api/orchestration/v1alpha1/roleset_types.go +++ b/api/orchestration/v1alpha1/roleset_types.go @@ -64,6 +64,13 @@ type RoleSpec struct { // +optional Replicas *int32 `json:"replicas,omitempty"` + // UpgradeOrder specifies the order in which this role should be upgraded. + // Lower values are upgraded first. If not specified, defaults to 0. + // +optional + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:validation:Type=integer + UpgradeOrder *int32 `json:"upgradeOrder,omitempty"` + // PodGroupSize is the number of pods to form a minimum role instance. // +optional PodGroupSize *int32 `json:"podGroupSize,omitempty"` diff --git a/api/orchestration/v1alpha1/zz_generated.deepcopy.go b/api/orchestration/v1alpha1/zz_generated.deepcopy.go index 70b8d3127..fae0e3264 100644 --- a/api/orchestration/v1alpha1/zz_generated.deepcopy.go +++ b/api/orchestration/v1alpha1/zz_generated.deepcopy.go @@ -671,6 +671,11 @@ func (in *RoleSpec) DeepCopyInto(out *RoleSpec) { *out = new(int32) **out = **in } + if in.UpgradeOrder != nil { + in, out := &in.UpgradeOrder, &out.UpgradeOrder + *out = new(int32) + **out = **in + } if in.PodGroupSize != nil { in, out := &in.PodGroupSize, &out.PodGroupSize *out = new(int32) diff --git a/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml b/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml index 6f3ae70e5..baae07536 100644 --- a/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml +++ b/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml @@ -3687,6 +3687,10 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + upgradeOrder: + format: int32 + minimum: 0 + type: integer type: object type: array schedulingStrategy: diff --git a/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml b/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml index c7b67a918..dcfe46cd0 100644 --- a/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml +++ b/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml @@ -3754,6 +3754,10 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + upgradeOrder: + format: int32 + minimum: 0 + type: integer type: object type: array schedulingStrategy: diff --git a/pkg/controller/roleset/rolling.go b/pkg/controller/roleset/rolling.go index 89cd6b296..d55e11c1b 100644 --- a/pkg/controller/roleset/rolling.go +++ b/pkg/controller/roleset/rolling.go @@ -50,9 +50,12 @@ func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrat klog.Infof("[RollingManagerSequential.Next] waiting for roleset %s/%s to be scaled", roleSet.Namespace, roleSet.Name) return nil } - // 2. do the rollout process for each role - // TODO: in future, consider the rollout sequence based on the role's priority - for _, role := range roleSet.Spec.Roles { + + // 2. Sort roles by upgrade order + sortedRoles := sortRolesByUpgradeOrder(roleSet.Spec.Roles) + + // 3. do the rollout process for each role by order + for _, role := range sortedRoles { klog.Infof("[RollingManagerSequential.Next] start to rollout roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name) err := GetRoleSyncer(m.cli, &role).Rollout(ctx, roleSet, &role) if err != nil { diff --git a/pkg/controller/roleset/utils.go b/pkg/controller/roleset/utils.go index 8270557e1..4d233bf91 100644 --- a/pkg/controller/roleset/utils.go +++ b/pkg/controller/roleset/utils.go @@ -377,3 +377,20 @@ func deletePodsInBatch(ctx context.Context, cli client.Client, podsToDelete []*v return cli.Delete(ctx, podsToDelete[index]) }) } + +func sortRolesByUpgradeOrder(roles []orchestrationv1alpha1.RoleSpec) []orchestrationv1alpha1.RoleSpec { + sortedRoles := make([]orchestrationv1alpha1.RoleSpec, len(roles)) + copy(sortedRoles, roles) + sort.Slice(sortedRoles, func(i, j int) bool { + orderI := int32(0) + if sortedRoles[i].UpgradeOrder != nil { + orderI = *sortedRoles[i].UpgradeOrder + } + orderJ := int32(0) + if sortedRoles[j].UpgradeOrder != nil { + orderJ = *sortedRoles[j].UpgradeOrder + } + return orderI < orderJ + }) + return sortedRoles +} diff --git a/pkg/controller/roleset/utils_test.go b/pkg/controller/roleset/utils_test.go index b1f1b4efc..30b05ebb2 100644 --- a/pkg/controller/roleset/utils_test.go +++ b/pkg/controller/roleset/utils_test.go @@ -17,6 +17,7 @@ limitations under the License. package roleset import ( + "reflect" "testing" "github.com/stretchr/testify/assert" @@ -292,3 +293,107 @@ func makeNotReadyPod(name string) *corev1.Pod { pod.Status.Conditions[0].Status = corev1.ConditionFalse return pod } + +func TestSortRolesByUpgradeOrder(t *testing.T) { + int32Ptr := func(i int32) *int32 { return &i } + + tests := []struct { + name string + roles []orchestrationv1alpha1.RoleSpec + expected []orchestrationv1alpha1.RoleSpec + }{ + { + name: "empty roles", + roles: []orchestrationv1alpha1.RoleSpec{}, + expected: []orchestrationv1alpha1.RoleSpec{}, + }, + { + name: "already sorted roles", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "role1", UpgradeOrder: int32Ptr(1)}, + {Name: "role2", UpgradeOrder: int32Ptr(2)}, + {Name: "role3", UpgradeOrder: int32Ptr(3)}, + }, + expected: []orchestrationv1alpha1.RoleSpec{ + {Name: "role1", UpgradeOrder: int32Ptr(1)}, + {Name: "role2", UpgradeOrder: int32Ptr(2)}, + {Name: "role3", UpgradeOrder: int32Ptr(3)}, + }, + }, + { + name: "unsorted roles", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "role3", UpgradeOrder: int32Ptr(3)}, + {Name: "role1", UpgradeOrder: int32Ptr(1)}, + {Name: "role2", UpgradeOrder: int32Ptr(2)}, + }, + expected: []orchestrationv1alpha1.RoleSpec{ + {Name: "role1", UpgradeOrder: int32Ptr(1)}, + {Name: "role2", UpgradeOrder: int32Ptr(2)}, + {Name: "role3", UpgradeOrder: int32Ptr(3)}, + }, + }, + { + name: "roles with nil upgrade order", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "role3", UpgradeOrder: int32Ptr(2)}, + {Name: "role1", UpgradeOrder: nil}, + {Name: "role2", UpgradeOrder: int32Ptr(1)}, + }, + expected: []orchestrationv1alpha1.RoleSpec{ + {Name: "role1", UpgradeOrder: nil}, + {Name: "role2", UpgradeOrder: int32Ptr(1)}, + {Name: "role3", UpgradeOrder: int32Ptr(2)}, + }, + }, + { + name: "roles with same upgrade order", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "role1", UpgradeOrder: int32Ptr(1)}, + {Name: "role2", UpgradeOrder: int32Ptr(1)}, + {Name: "role3", UpgradeOrder: int32Ptr(1)}, + }, + expected: []orchestrationv1alpha1.RoleSpec{ + {Name: "role1", UpgradeOrder: int32Ptr(1)}, + {Name: "role2", UpgradeOrder: int32Ptr(1)}, + {Name: "role3", UpgradeOrder: int32Ptr(1)}, + }, + }, + { + name: "mix of nil and non-nil upgrade orders", + roles: []orchestrationv1alpha1.RoleSpec{ + {Name: "role4", UpgradeOrder: int32Ptr(2)}, + {Name: "role1", UpgradeOrder: nil}, + {Name: "role2", UpgradeOrder: nil}, + {Name: "role3", UpgradeOrder: int32Ptr(1)}, + }, + expected: []orchestrationv1alpha1.RoleSpec{ + {Name: "role1", UpgradeOrder: nil}, + {Name: "role2", UpgradeOrder: nil}, + {Name: "role3", UpgradeOrder: int32Ptr(1)}, + {Name: "role4", UpgradeOrder: int32Ptr(2)}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a copy of input roles to verify the original slice is not modified + originalRoles := make([]orchestrationv1alpha1.RoleSpec, len(tt.roles)) + copy(originalRoles, tt.roles) + + result := sortRolesByUpgradeOrder(tt.roles) + t.Logf("result len %d", len(result)) + + // Check if the result matches expected + if !reflect.DeepEqual(result, tt.expected) { + t.Errorf("sortRolesByUpgradeOrder() = %v, want %v", result, tt.expected) + } + + // Verify the original slice was not modified + if !reflect.DeepEqual(tt.roles, originalRoles) { + t.Errorf("Original roles were modified: got %v, want %v", tt.roles, originalRoles) + } + }) + } +} From b6ec49ac40f06caed287920d5e1ba48fcbce0f2d Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Sun, 10 Aug 2025 13:00:27 +0000 Subject: [PATCH 2/8] Add logs Signed-off-by: Omer Aplatony --- pkg/controller/roleset/rolling.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/controller/roleset/rolling.go b/pkg/controller/roleset/rolling.go index d55e11c1b..12d6a6a74 100644 --- a/pkg/controller/roleset/rolling.go +++ b/pkg/controller/roleset/rolling.go @@ -52,7 +52,12 @@ func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrat } // 2. Sort roles by upgrade order + klog.Infof("[RollingManagerSequential.Next] sorting roleset roles by UpgradeOrder") sortedRoles := sortRolesByUpgradeOrder(roleSet.Spec.Roles) + klog.Infof("[RollingManagerSequential.Next] Upgrade sequence for roleset %s/%s:", roleSet.Namespace, roleSet.Name) + for i, role := range sortedRoles { + klog.Infof("[RollingManagerSequential.Next] [%d] Role: %s (UpgradeOrder: %d)", i+1, role.Name, *role.UpgradeOrder) + } // 3. do the rollout process for each role by order for _, role := range sortedRoles { From b35cc9380979ed4710ed62322672f2826321c4d3 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Sun, 10 Aug 2025 16:08:15 +0300 Subject: [PATCH 3/8] used SliceStable instead of Slice Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Omer Aplatony --- pkg/controller/roleset/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/roleset/utils.go b/pkg/controller/roleset/utils.go index 4d233bf91..b91e72197 100644 --- a/pkg/controller/roleset/utils.go +++ b/pkg/controller/roleset/utils.go @@ -381,7 +381,7 @@ func deletePodsInBatch(ctx context.Context, cli client.Client, podsToDelete []*v func sortRolesByUpgradeOrder(roles []orchestrationv1alpha1.RoleSpec) []orchestrationv1alpha1.RoleSpec { sortedRoles := make([]orchestrationv1alpha1.RoleSpec, len(roles)) copy(sortedRoles, roles) - sort.Slice(sortedRoles, func(i, j int) bool { + sort.SliceStable(sortedRoles, func(i, j int) bool { orderI := int32(0) if sortedRoles[i].UpgradeOrder != nil { orderI = *sortedRoles[i].UpgradeOrder From 93665e3b1ddc76e00a040302168e2034ba7c6903 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Sun, 10 Aug 2025 13:09:39 +0000 Subject: [PATCH 4/8] fixed derefrence pointer Signed-off-by: Omer Aplatony --- pkg/controller/roleset/rolling.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/controller/roleset/rolling.go b/pkg/controller/roleset/rolling.go index 12d6a6a74..94d0217e5 100644 --- a/pkg/controller/roleset/rolling.go +++ b/pkg/controller/roleset/rolling.go @@ -56,7 +56,11 @@ func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrat sortedRoles := sortRolesByUpgradeOrder(roleSet.Spec.Roles) klog.Infof("[RollingManagerSequential.Next] Upgrade sequence for roleset %s/%s:", roleSet.Namespace, roleSet.Name) for i, role := range sortedRoles { - klog.Infof("[RollingManagerSequential.Next] [%d] Role: %s (UpgradeOrder: %d)", i+1, role.Name, *role.UpgradeOrder) + order := int32(0) + if role.UpgradeOrder != nil { + order = *role.UpgradeOrder + } + klog.Infof("[RollingManagerSequential.Next] [%d] Role: %s (UpgradeOrder: %d)", i+1, role.Name, order) } // 3. do the rollout process for each role by order From 9873132194ce40e389f199b8b120956cee52f93c Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Wed, 13 Aug 2025 17:57:27 +0000 Subject: [PATCH 5/8] set default 0 in crd Signed-off-by: Omer Aplatony --- api/orchestration/v1alpha1/roleset_types.go | 1 + .../orchestration.aibrix.ai_rolesets.yaml | 1 + .../orchestration.aibrix.ai_stormservices.yaml | 1 + pkg/controller/roleset/rolling.go | 11 +++++++++-- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/api/orchestration/v1alpha1/roleset_types.go b/api/orchestration/v1alpha1/roleset_types.go index 2937c4835..3631e5460 100644 --- a/api/orchestration/v1alpha1/roleset_types.go +++ b/api/orchestration/v1alpha1/roleset_types.go @@ -69,6 +69,7 @@ type RoleSpec struct { // +optional // +kubebuilder:validation:Minimum=0 // +kubebuilder:validation:Type=integer + // +kubebuilder:default:=0 UpgradeOrder *int32 `json:"upgradeOrder,omitempty"` // PodGroupSize is the number of pods to form a minimum role instance. diff --git a/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml b/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml index baae07536..81a0e5c96 100644 --- a/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml +++ b/config/crd/orchestration/orchestration.aibrix.ai_rolesets.yaml @@ -3688,6 +3688,7 @@ spec: x-kubernetes-int-or-string: true type: object upgradeOrder: + default: 0 format: int32 minimum: 0 type: integer diff --git a/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml b/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml index dcfe46cd0..ae9835ba5 100644 --- a/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml +++ b/config/crd/orchestration/orchestration.aibrix.ai_stormservices.yaml @@ -3755,6 +3755,7 @@ spec: x-kubernetes-int-or-string: true type: object upgradeOrder: + default: 0 format: int32 minimum: 0 type: integer diff --git a/pkg/controller/roleset/rolling.go b/pkg/controller/roleset/rolling.go index 94d0217e5..37198bca9 100644 --- a/pkg/controller/roleset/rolling.go +++ b/pkg/controller/roleset/rolling.go @@ -18,7 +18,9 @@ package roleset import ( "context" + "fmt" "math" + "strings" orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1" @@ -54,15 +56,20 @@ func (m *RollingManagerSequential) Next(ctx context.Context, roleSet *orchestrat // 2. Sort roles by upgrade order klog.Infof("[RollingManagerSequential.Next] sorting roleset roles by UpgradeOrder") sortedRoles := sortRolesByUpgradeOrder(roleSet.Spec.Roles) - klog.Infof("[RollingManagerSequential.Next] Upgrade sequence for roleset %s/%s:", roleSet.Namespace, roleSet.Name) + var sequenceLines []string for i, role := range sortedRoles { order := int32(0) if role.UpgradeOrder != nil { order = *role.UpgradeOrder } - klog.Infof("[RollingManagerSequential.Next] [%d] Role: %s (UpgradeOrder: %d)", i+1, role.Name, order) + sequenceLines = append(sequenceLines, fmt.Sprintf("[%d] %s (Order=%d)", i+1, role.Name, order)) } + klog.Infof("[RollingManagerSequential.Next] Upgrade sequence for %s/%s:\n%s", + roleSet.Namespace, + roleSet.Name, + strings.Join(sequenceLines, "\n")) + // 3. do the rollout process for each role by order for _, role := range sortedRoles { klog.Infof("[RollingManagerSequential.Next] start to rollout roleset %s/%s role %s", roleSet.Namespace, roleSet.Name, role.Name) From ac3005a978ca5e4e1b03b2b389fe8a85e0b6f084 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Wed, 13 Aug 2025 21:21:56 +0000 Subject: [PATCH 6/8] update tests Signed-off-by: Omer Aplatony --- test/integration/webhook/stormservice_webhook_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/integration/webhook/stormservice_webhook_test.go b/test/integration/webhook/stormservice_webhook_test.go index 6be82c61c..fa85b481e 100644 --- a/test/integration/webhook/stormservice_webhook_test.go +++ b/test/integration/webhook/stormservice_webhook_test.go @@ -115,6 +115,7 @@ var _ = ginkgo.Describe("stormservice default webhook", func() { { Name: "worker", Replicas: ptr.To(int32(1)), + UpgradeOrder: ptr.To(int32(1)), Stateful: false, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -204,6 +205,7 @@ var _ = ginkgo.Describe("stormservice default webhook", func() { { Name: "master", Replicas: ptr.To(int32(1)), + UpgradeOrder: ptr.To(int32(1)), Stateful: true, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -328,6 +330,7 @@ func makeStormServiceWithNoSidecarInjection(name, namespace string, { Name: "worker", Replicas: ptr.To(int32(1)), + UpgradeOrder: ptr.To(int32(0)), Stateful: false, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -362,6 +365,7 @@ func makeStormServiceWithNoSidecarInjection(name, namespace string, { Name: "master", Replicas: ptr.To(int32(1)), + UpgradeOrder: ptr.To(int32(1)), Stateful: true, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ From 2270fa6525bf376cd535ced32edb2a954ec874a4 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Wed, 13 Aug 2025 21:34:27 +0000 Subject: [PATCH 7/8] update tests Signed-off-by: Omer Aplatony --- test/integration/webhook/stormservice_webhook_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/webhook/stormservice_webhook_test.go b/test/integration/webhook/stormservice_webhook_test.go index fa85b481e..a32f7d90b 100644 --- a/test/integration/webhook/stormservice_webhook_test.go +++ b/test/integration/webhook/stormservice_webhook_test.go @@ -330,7 +330,7 @@ func makeStormServiceWithNoSidecarInjection(name, namespace string, { Name: "worker", Replicas: ptr.To(int32(1)), - UpgradeOrder: ptr.To(int32(0)), + UpgradeOrder: ptr.To(int32(1)), Stateful: false, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ From cc1485c1f9ba63ae3301057cca0d345875f80ac9 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Wed, 13 Aug 2025 21:39:16 +0000 Subject: [PATCH 8/8] fmt Signed-off-by: Omer Aplatony --- .../webhook/stormservice_webhook_test.go | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/test/integration/webhook/stormservice_webhook_test.go b/test/integration/webhook/stormservice_webhook_test.go index a32f7d90b..ec0fa9566 100644 --- a/test/integration/webhook/stormservice_webhook_test.go +++ b/test/integration/webhook/stormservice_webhook_test.go @@ -113,10 +113,10 @@ var _ = ginkgo.Describe("stormservice default webhook", func() { UpdateStrategy: orchestrationapi.InterleaveRoleSetStrategyType, Roles: []orchestrationapi.RoleSpec{ { - Name: "worker", - Replicas: ptr.To(int32(1)), + Name: "worker", + Replicas: ptr.To(int32(1)), UpgradeOrder: ptr.To(int32(1)), - Stateful: false, + Stateful: false, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -203,10 +203,10 @@ var _ = ginkgo.Describe("stormservice default webhook", func() { }, }, { - Name: "master", - Replicas: ptr.To(int32(1)), + Name: "master", + Replicas: ptr.To(int32(1)), UpgradeOrder: ptr.To(int32(1)), - Stateful: true, + Stateful: true, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -328,10 +328,10 @@ func makeStormServiceWithNoSidecarInjection(name, namespace string, UpdateStrategy: orchestrationapi.InterleaveRoleSetStrategyType, Roles: []orchestrationapi.RoleSpec{ { - Name: "worker", - Replicas: ptr.To(int32(1)), + Name: "worker", + Replicas: ptr.To(int32(1)), UpgradeOrder: ptr.To(int32(1)), - Stateful: false, + Stateful: false, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -363,10 +363,10 @@ func makeStormServiceWithNoSidecarInjection(name, namespace string, }, }, { - Name: "master", - Replicas: ptr.To(int32(1)), + Name: "master", + Replicas: ptr.To(int32(1)), UpgradeOrder: ptr.To(int32(1)), - Stateful: true, + Stateful: true, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{