Skip to content

feat: Support Azure SB for publishmq #435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: azure-emulator
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions build/dev/azure/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@
}
]
},
// publishmq
{
"Name": "outpost-publish",
"Subscriptions": [
{
"Name": "outpost-publish-sub"
}
]
},

// tests
{
Expand Down
2 changes: 2 additions & 0 deletions cmd/publish/declare_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ func handleDeclare(w http.ResponseWriter, r *http.Request) {
switch r.URL.Query().Get("method") {
case "aws_sqs":
err = declareAWS()
case "azure_servicebus":
err = declareAzureServiceBus()
case "gcp_pubsub":
err = declareGCP()
case "rabbitmq":
Expand Down
58 changes: 58 additions & 0 deletions cmd/publish/publish_azure.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename file to publish_azureservicebus?

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

const (
AzureServiceBusConnectionString = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
AzureServiceBusTopic = "outpost-publish"
AzureServiceBusSubscription = "outpost-publish-sub"
)

func publishAzureServiceBus(body map[string]interface{}) error {
log.Printf("[x] Publishing Azure Service Bus")

ctx := context.Background()
client, err := azservicebus.NewClientFromConnectionString(AzureServiceBusConnectionString, nil)
if err != nil {
return fmt.Errorf("failed to create Azure Service Bus client: %w", err)
}
defer client.Close(ctx)

sender, err := client.NewSender(AzureServiceBusTopic, nil)
if err != nil {
return fmt.Errorf("failed to create sender: %w", err)
}
defer sender.Close(ctx)

messageBody, err := json.Marshal(body)
if err != nil {
return err
}

message := &azservicebus.Message{
Body: messageBody,
ApplicationProperties: map[string]interface{}{
"source": "outpost-publish",
},
}

err = sender.SendMessage(ctx, message, nil)
if err != nil {
return fmt.Errorf("failed to send message: %w", err)
}

log.Printf("[x] Published message to Azure Service Bus topic %s", AzureServiceBusTopic)
return nil
}

func declareAzureServiceBus() error {
log.Printf("[*] Declaring Azure Service Bus Publish infra")
return fmt.Errorf("azure sb emulator does not support declaring topics and subscriptions. Use `%s` and `%s` for the publishmq topic and subscription", AzureServiceBusTopic, AzureServiceBusSubscription)
}
2 changes: 2 additions & 0 deletions cmd/publish/publish_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func handlePublish(w http.ResponseWriter, r *http.Request) {
switch r.URL.Query().Get("method") {
case "aws_sqs":
err = publishAWS(body)
case "azure_servicebus":
err = publishAzureServiceBus(body)
case "gcp_pubsub":
err = publishGCP(body)
case "rabbitmq":
Expand Down
28 changes: 25 additions & 3 deletions internal/config/publishmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ type PublishAWSSQSConfig struct {
Queue string `yaml:"queue" env:"PUBLISH_AWS_SQS_QUEUE" desc:"Name of the SQS queue for publishing events. Required if AWS SQS is the chosen publish MQ provider." required:"C"`
}

type PublishAzureServiceBusConfig struct {
ConnectionString string `yaml:"connection_string" env:"PUBLISH_AZURE_SERVICEBUS_CONNECTION_STRING" desc:"Azure Service Bus connection string for the publish queue. Required if Azure Service Bus is the chosen publish MQ provider." required:"C"`
Topic string `yaml:"topic" env:"PUBLISH_AZURE_SERVICEBUS_TOPIC" desc:"Name of the Azure Service Bus topic for publishing events. Required if Azure Service Bus is the chosen publish MQ provider." required:"C"`
Subscription string `yaml:"subscription" env:"PUBLISH_AZURE_SERVICEBUS_SUBSCRIPTION" desc:"Name of the Azure Service Bus subscription to read published events from. Required if Azure Service Bus is the chosen publish MQ provider." required:"C"`
}

type PublishGCPPubSubConfig struct {
Project string `yaml:"project" env:"PUBLISH_GCP_PUBSUB_PROJECT" desc:"GCP Project ID for the Pub/Sub publish topic. Required if GCP Pub/Sub is the chosen publish MQ provider." required:"C"`
Topic string `yaml:"topic" env:"PUBLISH_GCP_PUBSUB_TOPIC" desc:"Name of the GCP Pub/Sub topic for publishing events. Required if GCP Pub/Sub is the chosen publish MQ provider." required:"C"`
Expand All @@ -28,15 +34,19 @@ type PublishRabbitMQConfig struct {
}

type PublishMQConfig struct {
AWSSQS PublishAWSSQSConfig `yaml:"aws_sqs" desc:"Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
GCPPubSub PublishGCPPubSubConfig `yaml:"gcp_pubsub" desc:"Configuration for using GCP Pub/Sub as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
RabbitMQ PublishRabbitMQConfig `yaml:"rabbitmq" desc:"Configuration for using RabbitMQ as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
AWSSQS PublishAWSSQSConfig `yaml:"aws_sqs" desc:"Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
AzureServiceBus PublishAzureServiceBusConfig `yaml:"azure_servicebus" desc:"Configuration for using Azure Service Bus as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
GCPPubSub PublishGCPPubSubConfig `yaml:"gcp_pubsub" desc:"Configuration for using GCP Pub/Sub as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
RabbitMQ PublishRabbitMQConfig `yaml:"rabbitmq" desc:"Configuration for using RabbitMQ as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
}

func (c PublishMQConfig) GetInfraType() string {
if hasPublishAWSSQSConfig(c.AWSSQS) {
return "awssqs"
}
if hasPublishAzureServiceBusConfig(c.AzureServiceBus) {
return "azureservicebus"
}
if hasPublishGCPPubSubConfig(c.GCPPubSub) {
return "gcppubsub"
}
Expand All @@ -59,6 +69,14 @@ func (c *PublishMQConfig) GetQueueConfig() *mqs.QueueConfig {
Topic: c.AWSSQS.Queue,
},
}
case "azureservicebus":
return &mqs.QueueConfig{
AzureServiceBus: &mqs.AzureServiceBusConfig{
ConnectionString: c.AzureServiceBus.ConnectionString,
Topic: c.AzureServiceBus.Topic,
Subscription: c.AzureServiceBus.Subscription,
},
}
case "gcppubsub":
return &mqs.QueueConfig{
GCPPubSub: &mqs.GCPPubSubConfig{
Expand Down Expand Up @@ -86,6 +104,10 @@ func hasPublishAWSSQSConfig(config PublishAWSSQSConfig) bool {
config.SecretAccessKey != "" && config.Region != ""
}

func hasPublishAzureServiceBusConfig(config PublishAzureServiceBusConfig) bool {
return config.ConnectionString != "" && config.Topic != "" && config.Subscription != ""
}

func hasPublishGCPPubSubConfig(config PublishGCPPubSubConfig) bool {
return config.Project != ""
}
Expand Down