Skip to content

Add conformance test to verify resolution of conflicting service types #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions conformance/clusterip_service_dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ var _ = Describe("", Label(OptionalLabel, DNSLabel, ClusterIPLabel), func() {

serviceImports := []*v1alpha1.ServiceImport{}
for _, client := range clients {
serviceImport := t.awaitServiceImport(&client, t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
return len(serviceImport.Spec.IPs) > 0
})
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found on cluster %q", client.name)
Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), "ServiceImport on cluster %q does not contain an IP", client.name)
serviceImport := t.awaitServiceImport(&client, t.helloService.Name, false,
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
g.Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), "ServiceImport on cluster %q does not contain an IP", client.name)
})
serviceImports = append(serviceImports, serviceImport)
}

Expand Down
32 changes: 20 additions & 12 deletions conformance/conformance_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
rest "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -227,22 +226,31 @@ func (t *testDriver) getServiceImport(c *clusterClients, name string) *v1alpha1.
return si
}

func (t *testDriver) awaitServiceImport(c *clusterClients, name string, verify func(*v1alpha1.ServiceImport) bool) *v1alpha1.ServiceImport {
func (t *testDriver) awaitServiceImport(c *clusterClients, name string, reportNonConformanceOnMissing bool,
verify func(Gomega, *v1alpha1.ServiceImport)) *v1alpha1.ServiceImport {
var serviceImport *v1alpha1.ServiceImport

_ = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond,
20*time.Second, true, func(ctx context.Context) (bool, error) {
defer GinkgoRecover()
Eventually(func(g Gomega) {
si := t.getServiceImport(c, name)

si := t.getServiceImport(c, name)
if si == nil {
return false, nil
}
missingMsg := fmt.Sprintf("ServiceImport was not found on cluster %q", c.name)

serviceImport = si
var missing any = missingMsg
if reportNonConformanceOnMissing {
missing = reportNonConformant(missingMsg)
}

return verify == nil || verify(serviceImport), nil
})
g.Expect(si).NotTo(BeNil(), missing)

serviceImport = si

if verify != nil {
verify(g, serviceImport)
}

// The final run succeeded so cancel any prior non-conformance reported.
cancelNonConformanceReport()
}).Within(20 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed())

return serviceImport
}
Expand Down
136 changes: 72 additions & 64 deletions conformance/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
_ = Describe("", Label(ClusterIPLabel), testClusterIPServiceImport)
_ = Describe("", Label(HeadlessLabel), testHeadlessServiceImport)
_ = Describe("", Label(ExternalNameLabel), testExternalNameService)
_ = Describe("", testServiceTypeConflict)
)

func testGeneralServiceImport() {
Expand All @@ -47,14 +48,15 @@ func testGeneralServiceImport() {
t.createServiceExport(&clients[0], helloServiceExport)
})

assertHasKeyValues := func(actual, expected map[string]string) {
assertHasKeyValues := func(g Gomega, actual, expected map[string]string) {
for k, v := range expected {
Expect(actual).To(HaveKeyWithValue(k, v), reportNonConformant(""))
g.Expect(actual).To(HaveKeyWithValue(k, v), reportNonConformant(""))
}
}
assertNotHasKeyValues := func(actual, expected map[string]string) {

assertNotHasKeyValues := func(g Gomega, actual, expected map[string]string) {
for k, v := range expected {
Expect(actual).ToNot(HaveKeyWithValue(k, v), reportNonConformant(""))
g.Expect(actual).ToNot(HaveKeyWithValue(k, v), reportNonConformant(""))
}
}

Expand All @@ -66,10 +68,7 @@ func testGeneralServiceImport() {

AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/blob/master/keps/sig-multicluster/1645-multi-cluster-services-api/README.md#importing-services")

serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
return true
})
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
t.awaitServiceImport(&clients[0], t.helloService.Name, false, nil)

By(fmt.Sprintf("Exporting the service on the second cluster %q", clients[1].name))

Expand Down Expand Up @@ -109,16 +108,14 @@ func testGeneralServiceImport() {
Label(OptionalLabel), Label(ExportedLabelsLabel), func() {
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#labels-and-annotations")

serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, func(serviceImport *v1alpha1.ServiceImport) bool {
return len(serviceImport.Labels) > 0
})
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
t.awaitServiceImport(&clients[0], helloServiceName, false,
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
assertHasKeyValues(g, serviceImport.Annotations, helloServiceExport.Annotations)
assertNotHasKeyValues(g, serviceImport.Annotations, t.helloService.Annotations)

assertHasKeyValues(serviceImport.Annotations, helloServiceExport.Annotations)
assertNotHasKeyValues(serviceImport.Annotations, t.helloService.Annotations)

assertHasKeyValues(serviceImport.Labels, helloServiceExport.Labels)
assertNotHasKeyValues(serviceImport.Labels, t.helloService.Labels)
assertHasKeyValues(g, serviceImport.Labels, helloServiceExport.Labels)
assertNotHasKeyValues(g, serviceImport.Labels, t.helloService.Labels)
})
})
})

Expand All @@ -141,16 +138,14 @@ func testGeneralServiceImport() {
t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)
t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)

serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
return len(serviceImport.Labels) > 0
})
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")

assertHasKeyValues(serviceImport.Annotations, helloServiceExport.Annotations)
assertNotHasKeyValues(serviceImport.Annotations, tt.helloServiceExport2.Annotations)
t.awaitServiceImport(&clients[0], t.helloService.Name, false,
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
assertHasKeyValues(g, serviceImport.Annotations, helloServiceExport.Annotations)
assertNotHasKeyValues(g, serviceImport.Annotations, tt.helloServiceExport2.Annotations)

assertHasKeyValues(serviceImport.Labels, helloServiceExport.Labels)
assertNotHasKeyValues(serviceImport.Labels, tt.helloServiceExport2.Labels)
assertHasKeyValues(g, serviceImport.Labels, helloServiceExport.Labels)
assertNotHasKeyValues(g, serviceImport.Labels, tt.helloServiceExport2.Labels)
})
})
})
})
Expand All @@ -173,9 +168,7 @@ func testClusterIPServiceImport() {
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#importing-services")

for i := range clients {
serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, nil)
Expect(serviceImport).NotTo(BeNil(), reportNonConformant(fmt.Sprintf("ServiceImport was not found on cluster %q",
clients[i].name)))
serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil)

Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.ClusterSetIP), reportNonConformant(
fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type)))
Expand All @@ -195,37 +188,32 @@ func testClusterIPServiceImport() {
Label(RequiredLabel), func() {
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#session-affinity")

serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, nil)
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
t.awaitServiceImport(&clients[0], helloServiceName, false, func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
g.Expect(serviceImport.Spec.SessionAffinity).To(Equal(t.helloService.Spec.SessionAffinity), reportNonConformant(""))

Expect(serviceImport.Spec.SessionAffinity).To(Equal(t.helloService.Spec.SessionAffinity), reportNonConformant(""))

Expect(serviceImport.Spec.SessionAffinityConfig).To(Equal(t.helloService.Spec.SessionAffinityConfig), reportNonConformant(
"The SessionAffinityConfig of the ServiceImport does not match the exported Service's SessionAffinityConfig"))
g.Expect(serviceImport.Spec.SessionAffinityConfig).To(Equal(t.helloService.Spec.SessionAffinityConfig), reportNonConformant(
"The SessionAffinityConfig of the ServiceImport does not match the exported Service's SessionAffinityConfig"))
})
})

Specify("An IP should be allocated for a ClusterSetIP ServiceImport", Label(RequiredLabel), func() {
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#clustersetip")

serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
return len(serviceImport.Spec.IPs) > 0
})
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, false,
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
g.Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), reportNonConformant(""))
})

Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), reportNonConformant(""))
Expect(net.ParseIP(serviceImport.Spec.IPs[0])).ToNot(BeNil(),
reportNonConformant(fmt.Sprintf("The value %q is not a valid IP", serviceImport.Spec.IPs[0])))
})

Specify("The ports for a ClusterSetIP ServiceImport should match those of the exported service", Label(RequiredLabel), func() {
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-port")

serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, func(serviceImport *v1alpha1.ServiceImport) bool {
return len(serviceImport.Spec.Ports) > 0
t.awaitServiceImport(&clients[0], helloServiceName, false, func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), reportNonConformant(""))
})
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")

Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), reportNonConformant(""))
})

Context("A ClusterIP service exported on two clusters", func() {
Expand All @@ -246,13 +234,11 @@ func testClusterIPServiceImport() {
Specify("should expose the union of the constituent service ports", Label(RequiredLabel), func() {
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-port")

serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
return len(serviceImport.Spec.Ports) == 3
})
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")

Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(
append(t.helloService.Spec.Ports, tt.helloService2.Spec.Ports[1]))), reportNonConformant(""))
t.awaitServiceImport(&clients[0], t.helloService.Name, false,
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(
append(t.helloService.Spec.Ports, tt.helloService2.Spec.Ports[1]))), reportNonConformant(""))
})
})
})

Expand All @@ -268,13 +254,11 @@ func testClusterIPServiceImport() {
t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)
t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)

serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
return len(serviceImport.Spec.Ports) == len(t.helloService.Spec.Ports)
})
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")

Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)),
reportNonConformant("The service ports were not resolved correctly"))
t.awaitServiceImport(&clients[0], t.helloService.Name, false,
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)),
reportNonConformant("The service ports were not resolved correctly"))
})
})
})
})
Expand All @@ -296,9 +280,7 @@ func testHeadlessServiceImport() {
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-types")

for i := range clients {
serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, nil)
Expect(serviceImport).NotTo(BeNil(), reportNonConformant(fmt.Sprintf("ServiceImport was not found on cluster %q",
clients[i].name)))
serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil)

Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.Headless), reportNonConformant(
fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type)))
Expand All @@ -317,8 +299,7 @@ func testHeadlessServiceImport() {
Specify("No clusterset IP should be allocated for a Headless ServiceImport", Label(RequiredLabel), func() {
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#clustersetip")

serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, nil)
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
t.awaitServiceImport(&clients[0], t.helloService.Name, false, nil)

Consistently(func() []string {
return t.getServiceImport(&clients[0], t.helloService.Name).Spec.IPs
Expand Down Expand Up @@ -346,3 +327,30 @@ func testExternalNameService() {
"the ServiceImport should not exist for an ExternalName service")
})
}

func testServiceTypeConflict() {
t := newTwoClusterTestDriver(newTestDriver())

BeforeEach(func() {
t.helloService2.Spec.ClusterIP = corev1.ClusterIPNone
})

JustBeforeEach(func() {
t.createServiceExport(&clients[0], newHelloServiceExport())
})

Specify("A service exported on two clusters with conflicting headlessness should apply the conflict resolution policy and "+
"report a Conflict condition on the ServiceExport", Label(RequiredLabel), func() {
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#headlessness")

t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)
t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW shouldn't this be done on both clusters?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KEP says this for reference:

Conflict resolution policy: If any properties have conflicting values that can not simply be merged, a ServiceExportConflict condition will be set on all ServiceExports for the conflicted service with a description of the conflict.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and it's also what we are doing elsewhere it seems)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know but that's problematic unless every cluster has access to the ServiceExports on every other cluster or there's a central controller that has access to all. Submariner has neither. This is an assumption that the KEP makes that it really shouldn't that's been discussed in the past. Plus I don't think it really needs to be on every ServiceExport in this case. The important thing is that it's on the ServiceExport in the cluster that is actually in conflict.

Copy link
Member

@MrFreezeex MrFreezeex Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes ok I see, well to me the conformance test should have consistent tests so just for this matter I think it should have the test for both cluster because I don't think the other conflict tests are any different.

That being said I would be supportive of loosing the tone on the KEP to accommodate your need (for instance saying that implementation are required to have a conflict condition on the "loosing service export" but that it's recommended to have it on every ServiceExport if feasible) and to change all tests doing similar check once merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought we decided to relax the statement? for some implementation, applying the conditions on every exporting clusters are not feasible, and not necessary. The conflicted service from other clusters should not stop/impact my existing exporting traffic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhiying-lin There was a discussion re: relaxing it on the SIG call but there was pushback by some folks. I'm not pursuing it any more from the Submariner side as I found a straightforward way we can handle it but, if you have an implementation facing the same difficulty, you can revive the discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please share the conversation link with me? i missed that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the meeting was recorded but I don't know where to access them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, the recent recording was not uploaded, i;ll bring up in today's APAC sig meeting, thank you for the info!


for i := range clients {
serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil)

Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.ClusterSetIP), reportNonConformant(
fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type)))
}
})
}