Skip to content

Commit 9cc62ee

Browse files
authored
feat: Support Azure SB for publishmq (#435)
* feat: Support Azure SB for publishmq * chore: Configure publishmq in emulator for local dev & local publishmq helper * chore: rename file to specify mq services
1 parent 0b3de87 commit 9cc62ee

File tree

7 files changed

+96
-3
lines changed

7 files changed

+96
-3
lines changed

build/dev/azure/config.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@
3232
}
3333
]
3434
},
35+
// publishmq
36+
{
37+
"Name": "outpost-publish",
38+
"Subscriptions": [
39+
{
40+
"Name": "outpost-publish-sub"
41+
}
42+
]
43+
},
3544

3645
// tests
3746
{

cmd/publish/declare_handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ func handleDeclare(w http.ResponseWriter, r *http.Request) {
1010
switch r.URL.Query().Get("method") {
1111
case "aws_sqs":
1212
err = declareAWS()
13+
case "azure_servicebus":
14+
err = declareAzureServiceBus()
1315
case "gcp_pubsub":
1416
err = declareGCP()
1517
case "rabbitmq":
File renamed without changes.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log"
8+
9+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
10+
)
11+
12+
const (
13+
AzureServiceBusConnectionString = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
14+
AzureServiceBusTopic = "outpost-publish"
15+
AzureServiceBusSubscription = "outpost-publish-sub"
16+
)
17+
18+
func publishAzureServiceBus(body map[string]interface{}) error {
19+
log.Printf("[x] Publishing Azure Service Bus")
20+
21+
ctx := context.Background()
22+
client, err := azservicebus.NewClientFromConnectionString(AzureServiceBusConnectionString, nil)
23+
if err != nil {
24+
return fmt.Errorf("failed to create Azure Service Bus client: %w", err)
25+
}
26+
defer client.Close(ctx)
27+
28+
sender, err := client.NewSender(AzureServiceBusTopic, nil)
29+
if err != nil {
30+
return fmt.Errorf("failed to create sender: %w", err)
31+
}
32+
defer sender.Close(ctx)
33+
34+
messageBody, err := json.Marshal(body)
35+
if err != nil {
36+
return err
37+
}
38+
39+
message := &azservicebus.Message{
40+
Body: messageBody,
41+
ApplicationProperties: map[string]interface{}{
42+
"source": "outpost-publish",
43+
},
44+
}
45+
46+
err = sender.SendMessage(ctx, message, nil)
47+
if err != nil {
48+
return fmt.Errorf("failed to send message: %w", err)
49+
}
50+
51+
log.Printf("[x] Published message to Azure Service Bus topic %s", AzureServiceBusTopic)
52+
return nil
53+
}
54+
55+
func declareAzureServiceBus() error {
56+
log.Printf("[*] Declaring Azure Service Bus Publish infra")
57+
return fmt.Errorf("azure sb emulator does not support declaring topics and subscriptions. Use `%s` and `%s` for the publishmq topic and subscription", AzureServiceBusTopic, AzureServiceBusSubscription)
58+
}
File renamed without changes.

cmd/publish/publish_handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ func handlePublish(w http.ResponseWriter, r *http.Request) {
1717
switch r.URL.Query().Get("method") {
1818
case "aws_sqs":
1919
err = publishAWS(body)
20+
case "azure_servicebus":
21+
err = publishAzureServiceBus(body)
2022
case "gcp_pubsub":
2123
err = publishGCP(body)
2224
case "rabbitmq":

internal/config/publishmq.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ type PublishAWSSQSConfig struct {
1414
Queue string `yaml:"queue" env:"PUBLISH_AWS_SQS_QUEUE" desc:"Name of the SQS queue for publishing events. Required if AWS SQS is the chosen publish MQ provider." required:"C"`
1515
}
1616

17+
type PublishAzureServiceBusConfig struct {
18+
ConnectionString string `yaml:"connection_string" env:"PUBLISH_AZURE_SERVICEBUS_CONNECTION_STRING" desc:"Azure Service Bus connection string for the publish queue. Required if Azure Service Bus is the chosen publish MQ provider." required:"C"`
19+
Topic string `yaml:"topic" env:"PUBLISH_AZURE_SERVICEBUS_TOPIC" desc:"Name of the Azure Service Bus topic for publishing events. Required if Azure Service Bus is the chosen publish MQ provider." required:"C"`
20+
Subscription string `yaml:"subscription" env:"PUBLISH_AZURE_SERVICEBUS_SUBSCRIPTION" desc:"Name of the Azure Service Bus subscription to read published events from. Required if Azure Service Bus is the chosen publish MQ provider." required:"C"`
21+
}
22+
1723
type PublishGCPPubSubConfig struct {
1824
Project string `yaml:"project" env:"PUBLISH_GCP_PUBSUB_PROJECT" desc:"GCP Project ID for the Pub/Sub publish topic. Required if GCP Pub/Sub is the chosen publish MQ provider." required:"C"`
1925
Topic string `yaml:"topic" env:"PUBLISH_GCP_PUBSUB_TOPIC" desc:"Name of the GCP Pub/Sub topic for publishing events. Required if GCP Pub/Sub is the chosen publish MQ provider." required:"C"`
@@ -28,15 +34,19 @@ type PublishRabbitMQConfig struct {
2834
}
2935

3036
type PublishMQConfig struct {
31-
AWSSQS PublishAWSSQSConfig `yaml:"aws_sqs" desc:"Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
32-
GCPPubSub PublishGCPPubSubConfig `yaml:"gcp_pubsub" desc:"Configuration for using GCP Pub/Sub as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
33-
RabbitMQ PublishRabbitMQConfig `yaml:"rabbitmq" desc:"Configuration for using RabbitMQ as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
37+
AWSSQS PublishAWSSQSConfig `yaml:"aws_sqs" desc:"Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
38+
AzureServiceBus PublishAzureServiceBusConfig `yaml:"azure_servicebus" desc:"Configuration for using Azure Service Bus as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
39+
GCPPubSub PublishGCPPubSubConfig `yaml:"gcp_pubsub" desc:"Configuration for using GCP Pub/Sub as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
40+
RabbitMQ PublishRabbitMQConfig `yaml:"rabbitmq" desc:"Configuration for using RabbitMQ as the publish message queue. Only one publish MQ provider should be configured." required:"N"`
3441
}
3542

3643
func (c PublishMQConfig) GetInfraType() string {
3744
if hasPublishAWSSQSConfig(c.AWSSQS) {
3845
return "awssqs"
3946
}
47+
if hasPublishAzureServiceBusConfig(c.AzureServiceBus) {
48+
return "azureservicebus"
49+
}
4050
if hasPublishGCPPubSubConfig(c.GCPPubSub) {
4151
return "gcppubsub"
4252
}
@@ -59,6 +69,14 @@ func (c *PublishMQConfig) GetQueueConfig() *mqs.QueueConfig {
5969
Topic: c.AWSSQS.Queue,
6070
},
6171
}
72+
case "azureservicebus":
73+
return &mqs.QueueConfig{
74+
AzureServiceBus: &mqs.AzureServiceBusConfig{
75+
ConnectionString: c.AzureServiceBus.ConnectionString,
76+
Topic: c.AzureServiceBus.Topic,
77+
Subscription: c.AzureServiceBus.Subscription,
78+
},
79+
}
6280
case "gcppubsub":
6381
return &mqs.QueueConfig{
6482
GCPPubSub: &mqs.GCPPubSubConfig{
@@ -86,6 +104,10 @@ func hasPublishAWSSQSConfig(config PublishAWSSQSConfig) bool {
86104
config.SecretAccessKey != "" && config.Region != ""
87105
}
88106

107+
func hasPublishAzureServiceBusConfig(config PublishAzureServiceBusConfig) bool {
108+
return config.ConnectionString != "" && config.Topic != "" && config.Subscription != ""
109+
}
110+
89111
func hasPublishGCPPubSubConfig(config PublishGCPPubSubConfig) bool {
90112
return config.Project != ""
91113
}

0 commit comments

Comments
 (0)