From a71b36475de05197c583e61f3fac3a3e7916dcac Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 26 Jun 2025 18:41:48 +0700 Subject: [PATCH 1/2] feat: Support Azure SB for publishmq --- internal/config/publishmq.go | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/internal/config/publishmq.go b/internal/config/publishmq.go index 03dfa32c..d77d4172 100644 --- a/internal/config/publishmq.go +++ b/internal/config/publishmq.go @@ -14,6 +14,12 @@ type PublishAWSSQSConfig struct { Queue string `yaml:"queue" env:"PUBLISH_AWS_SQS_QUEUE" desc:"Name of the SQS queue for publishing events. Required if AWS SQS is the chosen publish MQ provider." required:"C"` } +type PublishAzureServiceBusConfig struct { + ConnectionString string `yaml:"connection_string" env:"PUBLISH_AZURE_SERVICEBUS_CONNECTION_STRING" desc:"Azure Service Bus connection string for the publish queue. Required if Azure Service Bus is the chosen publish MQ provider." required:"C"` + Topic string `yaml:"topic" env:"PUBLISH_AZURE_SERVICEBUS_TOPIC" desc:"Name of the Azure Service Bus topic for publishing events. Required if Azure Service Bus is the chosen publish MQ provider." required:"C"` + Subscription string `yaml:"subscription" env:"PUBLISH_AZURE_SERVICEBUS_SUBSCRIPTION" desc:"Name of the Azure Service Bus subscription to read published events from. Required if Azure Service Bus is the chosen publish MQ provider." required:"C"` +} + type PublishGCPPubSubConfig struct { Project string `yaml:"project" env:"PUBLISH_GCP_PUBSUB_PROJECT" desc:"GCP Project ID for the Pub/Sub publish topic. Required if GCP Pub/Sub is the chosen publish MQ provider." required:"C"` Topic string `yaml:"topic" env:"PUBLISH_GCP_PUBSUB_TOPIC" desc:"Name of the GCP Pub/Sub topic for publishing events. Required if GCP Pub/Sub is the chosen publish MQ provider." required:"C"` @@ -28,15 +34,19 @@ type PublishRabbitMQConfig struct { } type PublishMQConfig struct { - AWSSQS PublishAWSSQSConfig `yaml:"aws_sqs" desc:"Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured." required:"N"` - GCPPubSub PublishGCPPubSubConfig `yaml:"gcp_pubsub" desc:"Configuration for using GCP Pub/Sub as the publish message queue. Only one publish MQ provider should be configured." required:"N"` - RabbitMQ PublishRabbitMQConfig `yaml:"rabbitmq" desc:"Configuration for using RabbitMQ as the publish message queue. Only one publish MQ provider should be configured." required:"N"` + AWSSQS PublishAWSSQSConfig `yaml:"aws_sqs" desc:"Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured." required:"N"` + AzureServiceBus PublishAzureServiceBusConfig `yaml:"azure_servicebus" desc:"Configuration for using Azure Service Bus as the publish message queue. Only one publish MQ provider should be configured." required:"N"` + GCPPubSub PublishGCPPubSubConfig `yaml:"gcp_pubsub" desc:"Configuration for using GCP Pub/Sub as the publish message queue. Only one publish MQ provider should be configured." required:"N"` + RabbitMQ PublishRabbitMQConfig `yaml:"rabbitmq" desc:"Configuration for using RabbitMQ as the publish message queue. Only one publish MQ provider should be configured." required:"N"` } func (c PublishMQConfig) GetInfraType() string { if hasPublishAWSSQSConfig(c.AWSSQS) { return "awssqs" } + if hasPublishAzureServiceBusConfig(c.AzureServiceBus) { + return "azureservicebus" + } if hasPublishGCPPubSubConfig(c.GCPPubSub) { return "gcppubsub" } @@ -59,6 +69,14 @@ func (c *PublishMQConfig) GetQueueConfig() *mqs.QueueConfig { Topic: c.AWSSQS.Queue, }, } + case "azureservicebus": + return &mqs.QueueConfig{ + AzureServiceBus: &mqs.AzureServiceBusConfig{ + ConnectionString: c.AzureServiceBus.ConnectionString, + Topic: c.AzureServiceBus.Topic, + Subscription: c.AzureServiceBus.Subscription, + }, + } case "gcppubsub": return &mqs.QueueConfig{ GCPPubSub: &mqs.GCPPubSubConfig{ @@ -86,6 +104,10 @@ func hasPublishAWSSQSConfig(config PublishAWSSQSConfig) bool { config.SecretAccessKey != "" && config.Region != "" } +func hasPublishAzureServiceBusConfig(config PublishAzureServiceBusConfig) bool { + return config.ConnectionString != "" && config.Topic != "" && config.Subscription != "" +} + func hasPublishGCPPubSubConfig(config PublishGCPPubSubConfig) bool { return config.Project != "" } From b484615cdb075d8551cad0a9a808b306ca7ef0b1 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 26 Jun 2025 18:42:22 +0700 Subject: [PATCH 2/2] chore: Configure publishmq in emulator for local dev & local publishmq helper --- build/dev/azure/config.json | 9 ++++++ cmd/publish/declare_handler.go | 2 ++ cmd/publish/publish_azure.go | 58 ++++++++++++++++++++++++++++++++++ cmd/publish/publish_handler.go | 2 ++ 4 files changed, 71 insertions(+) create mode 100644 cmd/publish/publish_azure.go diff --git a/build/dev/azure/config.json b/build/dev/azure/config.json index 825d1f20..a8419d68 100644 --- a/build/dev/azure/config.json +++ b/build/dev/azure/config.json @@ -32,6 +32,15 @@ } ] }, + // publishmq + { + "Name": "outpost-publish", + "Subscriptions": [ + { + "Name": "outpost-publish-sub" + } + ] + }, // tests { diff --git a/cmd/publish/declare_handler.go b/cmd/publish/declare_handler.go index fa1db6af..89053a33 100644 --- a/cmd/publish/declare_handler.go +++ b/cmd/publish/declare_handler.go @@ -10,6 +10,8 @@ func handleDeclare(w http.ResponseWriter, r *http.Request) { switch r.URL.Query().Get("method") { case "aws_sqs": err = declareAWS() + case "azure_servicebus": + err = declareAzureServiceBus() case "gcp_pubsub": err = declareGCP() case "rabbitmq": diff --git a/cmd/publish/publish_azure.go b/cmd/publish/publish_azure.go new file mode 100644 index 00000000..18d8cb4f --- /dev/null +++ b/cmd/publish/publish_azure.go @@ -0,0 +1,58 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" +) + +const ( + AzureServiceBusConnectionString = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;" + AzureServiceBusTopic = "outpost-publish" + AzureServiceBusSubscription = "outpost-publish-sub" +) + +func publishAzureServiceBus(body map[string]interface{}) error { + log.Printf("[x] Publishing Azure Service Bus") + + ctx := context.Background() + client, err := azservicebus.NewClientFromConnectionString(AzureServiceBusConnectionString, nil) + if err != nil { + return fmt.Errorf("failed to create Azure Service Bus client: %w", err) + } + defer client.Close(ctx) + + sender, err := client.NewSender(AzureServiceBusTopic, nil) + if err != nil { + return fmt.Errorf("failed to create sender: %w", err) + } + defer sender.Close(ctx) + + messageBody, err := json.Marshal(body) + if err != nil { + return err + } + + message := &azservicebus.Message{ + Body: messageBody, + ApplicationProperties: map[string]interface{}{ + "source": "outpost-publish", + }, + } + + err = sender.SendMessage(ctx, message, nil) + if err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + + log.Printf("[x] Published message to Azure Service Bus topic %s", AzureServiceBusTopic) + return nil +} + +func declareAzureServiceBus() error { + log.Printf("[*] Declaring Azure Service Bus Publish infra") + return fmt.Errorf("azure sb emulator does not support declaring topics and subscriptions. Use `%s` and `%s` for the publishmq topic and subscription", AzureServiceBusTopic, AzureServiceBusSubscription) +} diff --git a/cmd/publish/publish_handler.go b/cmd/publish/publish_handler.go index 21fa7f1e..e40e96e8 100644 --- a/cmd/publish/publish_handler.go +++ b/cmd/publish/publish_handler.go @@ -17,6 +17,8 @@ func handlePublish(w http.ResponseWriter, r *http.Request) { switch r.URL.Query().Get("method") { case "aws_sqs": err = publishAWS(body) + case "azure_servicebus": + err = publishAzureServiceBus(body) case "gcp_pubsub": err = publishGCP(body) case "rabbitmq":