-
-
Notifications
You must be signed in to change notification settings - Fork 568
feat: add Solace pubsub+ module #3230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
a6a429c
feat: Add solace pubsub+ module
unicod3 429774a
chore: Update docs
unicod3 fd0ef3f
Merge branch 'main' into feature/solace
unicod3 f3b4984
Merge branch 'testcontainers:main' into feature/solace
unicod3 d117eb0
Remove coverage file
unicod3 214e5af
Upgrade the version
unicod3 64d2f3e
Apply PR suggestions
unicod3 0ab42cb
Apply PR suggestions
unicod3 7f2b222
Encapsulate internal variables
unicod3 87a7297
Introduce options pattern
unicod3 8b15624
Update solace.md
unicod3 e24ae01
Use wait package for the container script execution
unicod3 b0c452e
Update solace script
unicod3 e6827ad
Merge branch 'main' into feature/solace
unicod3 6869258
Apply PR suggestions
unicod3 a404209
Apply PR suggestions
unicod3 105fb91
Make use of `CopyToContainer` method
unicod3 e8eaa06
Replace exposed ports to WithServices
unicod3 950367b
Update solace.md
unicod3 86f636a
Render solace script from a template
unicod3 727bc4c
Apply PR suggestions
unicod3 b4ca460
Merge branch 'main' into feature/solace
unicod3 adb935a
Fix linter issues
unicod3 beafbe5
Switch assert to require for tests
unicod3 c165326
Fix image name
unicod3 3198a1c
docs: refinements
mdelapenya df10a36
chore: add a comment for the testable example's output
mdelapenya 07c6d2a
chore: move default services to a variable
mdelapenya 4fd1ed9
Remove sleep from the test
unicod3 58100c9
fix: typo
mdelapenya File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
# Solace Pubsub+ | ||
|
||
Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
## Introduction | ||
|
||
The Testcontainers module for Solace Pubsub+. | ||
|
||
## Adding this module to your project dependencies | ||
|
||
Please run the following command to add the Solace Pubsub+ module to your Go dependencies: | ||
|
||
``` | ||
go get github.com/testcontainers/testcontainers-go/modules/solace | ||
``` | ||
|
||
## Usage example | ||
|
||
<!--codeinclude--> | ||
[Creating a Solace Pubsub+ container](../../modules/solace/examples_test.go) inside_block:runSolaceContainer | ||
<!--/codeinclude--> | ||
|
||
## Module Reference | ||
|
||
### Run function | ||
|
||
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
The Solace Pubsub+ module exposes one entrypoint function to create the Solace Pubsub+ container, and this function receives three parameters: | ||
|
||
```golang | ||
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*SolaceContainer, error) | ||
``` | ||
|
||
- `context.Context`, the Go context. | ||
- `string`, the Docker image to use. | ||
- `testcontainers.ContainerCustomizer`, a variadic argument for passing options. | ||
|
||
#### Image | ||
|
||
Use the second argument in the `Run` function to set a valid Docker image. | ||
In example: `Run(context.Background(), "solace/solace-pubsub-standard:latest")`. | ||
|
||
### Container Options | ||
|
||
When starting the Solace Pubsub+ container, you can pass options in a variadic way to configure it. | ||
|
||
#### WithCredentials | ||
|
||
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
`WithCredentials(username, password string)`: sets the client credentials for authentication | ||
|
||
#### WithVpn | ||
|
||
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
`WithVpn(vpn string)`: sets the VPN name (defaults to "default") | ||
|
||
#### WithQueue | ||
|
||
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
`WithQueue(queueName, topic string)`: subscribes a given topic to a queue (for SMF/AMQP testing) | ||
|
||
#### WithShmSize | ||
|
||
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
`WithShmSize(size int64)`: sets the shared memory size (defaults to 1 GiB) | ||
|
||
#### WithServices | ||
|
||
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
The `WithServices` option is the recommended way to configure which Solace services should be exposed and made available in your container. This option automatically handles port exposure and sets up wait strategies for each specified service. | ||
|
||
Available services: | ||
|
||
- `ServiceAMQP` - AMQP service (port 5672) | ||
- `ServiceMQTT` - MQTT service (port 1883) | ||
- `ServiceREST` - REST service (port 9000) | ||
- `ServiceManagement` - Management service (port 8080) | ||
- `ServiceSMF` - SMF service (port 55555) | ||
- `ServiceSMFSSL` - SMF SSL service (port 55443) | ||
|
||
By default, when no `WithServices` option is specified, the container will expose AMQP, SMF, REST, and MQTT services. | ||
|
||
{% include "../features/common_functional_options_list.md" %} | ||
|
||
### Container Methods | ||
|
||
The Solace Pubsub+ container exposes the following methods: | ||
|
||
#### BrokerURLFor | ||
|
||
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
`BrokerURLFor(ctx context.Context, service Service) (string, error)` - returns the connection URL for a given Solace service. | ||
|
||
This method allows you to retrieve the connection URL for specific Solace services. The available services are: | ||
|
||
- `ServiceAMQP` - AMQP service (port 5672, protocol: amqp) | ||
- `ServiceMQTT` - MQTT service (port 1883, protocol: tcp) | ||
- `ServiceREST` - REST service (port 9000, protocol: http) | ||
- `ServiceManagement` - Management service (port 8080, protocol: http) | ||
- `ServiceSMF` - SMF service (port 55555, protocol: tcp) | ||
- `ServiceSMFSSL` - SMF SSL service (port 55443, protocol: tcps) | ||
|
||
```go | ||
// Get the AMQP connection URL | ||
amqpURL, err := container.BrokerURLFor(ctx, solace.ServiceAMQP) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
// amqpURL will be something like: amqp://localhost:32768 | ||
|
||
// Get the management URL | ||
mgmtURL, err := container.BrokerURLFor(ctx, solace.ServiceManagement) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
// mgmtURL will be something like: http://localhost:32769 | ||
``` | ||
|
||
#### Username | ||
|
||
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
`Username() string` - returns the configured username for authentication | ||
|
||
#### Password | ||
|
||
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
`Password() string` - returns the configured password for authentication | ||
|
||
#### VPN | ||
|
||
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
`VPN() string` - returns the configured VPN name |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
include ../../commons-test.mk | ||
|
||
.PHONY: test | ||
test: | ||
$(MAKE) test-solace |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
package solace_test | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"time" | ||
|
||
"solace.dev/go/messaging" | ||
"solace.dev/go/messaging/pkg/solace" | ||
"solace.dev/go/messaging/pkg/solace/config" | ||
"solace.dev/go/messaging/pkg/solace/message" | ||
"solace.dev/go/messaging/pkg/solace/resource" | ||
|
||
"github.com/testcontainers/testcontainers-go" | ||
sc "github.com/testcontainers/testcontainers-go/modules/solace" | ||
) | ||
|
||
func ExampleRun() { | ||
// runSolaceContainer { | ||
ctx := context.Background() | ||
ctr, err := sc.Run(ctx, "solace/solace-pubsub-standard:latest", | ||
sc.WithCredentials("admin", "admin"), | ||
sc.WithServices(sc.ServiceAMQP, sc.ServiceManagement), | ||
testcontainers.WithEnv(map[string]string{ | ||
"username_admin_globalaccesslevel": "admin", | ||
"username_admin_password": "admin", | ||
}), | ||
sc.WithShmSize(1<<30), | ||
) | ||
defer func() { | ||
if err := testcontainers.TerminateContainer(ctr); err != nil { | ||
log.Printf("failed to terminate container: %s", err) | ||
} | ||
}() | ||
fmt.Println(err) | ||
// } | ||
|
||
// Output: | ||
// <nil> | ||
} | ||
|
||
func ExampleRun_withTopicAndQueue() { | ||
ctx := context.Background() | ||
|
||
ctr, err := sc.Run(ctx, "solace/solace-pubsub-standard:latest", | ||
sc.WithCredentials("admin", "admin"), | ||
sc.WithVPN("test-vpn"), | ||
sc.WithServices(sc.ServiceAMQP, sc.ServiceManagement, sc.ServiceSMF), | ||
testcontainers.WithEnv(map[string]string{ | ||
"username_admin_globalaccesslevel": "admin", | ||
"username_admin_password": "admin", | ||
}), | ||
sc.WithShmSize(1<<30), | ||
sc.WithQueue("TestQueue", "Topic/MyTopic"), | ||
) | ||
defer func() { | ||
if err := testcontainers.TerminateContainer(ctr); err != nil { | ||
log.Printf("failed to terminate container: %s", err) | ||
} | ||
}() | ||
fmt.Println(err) | ||
// the [testMessagePublishAndConsume] function is responsible for printing the output | ||
// to the console, so be aware of that when adding it to other examples. | ||
err = testMessagePublishAndConsume(ctr, "TestQueue", "Topic/MyTopic") | ||
fmt.Println(err) | ||
|
||
// Output: | ||
// <nil> | ||
// Published message to topic: Topic/MyTopic | ||
// Received message: Hello from Solace testcontainers! | ||
// Successfully received message from queue: TestQueue | ||
// <nil> | ||
} | ||
|
||
func testMessagePublishAndConsume(ctr *sc.Container, queueName, topicName string) error { | ||
// Get the SMF service URL from the container | ||
smfURL, err := ctr.BrokerURLFor(context.Background(), sc.ServiceSMF) | ||
if err != nil { | ||
return fmt.Errorf("failed to get SMF URL: %w", err) | ||
} | ||
|
||
// Configure connection properties | ||
brokerConfig := config.ServicePropertyMap{ | ||
config.TransportLayerPropertyHost: smfURL, | ||
config.ServicePropertyVPNName: ctr.VPN(), | ||
config.AuthenticationPropertyScheme: config.AuthenticationSchemeBasic, | ||
config.AuthenticationPropertySchemeBasicUserName: ctr.Username(), | ||
config.AuthenticationPropertySchemeBasicPassword: ctr.Password(), | ||
config.TransportLayerPropertyReconnectionAttempts: 0, | ||
} | ||
|
||
// Build messaging service | ||
messagingService, err := messaging.NewMessagingServiceBuilder(). | ||
FromConfigurationProvider(brokerConfig). | ||
Build() | ||
if err != nil { | ||
return fmt.Errorf("failed to build messaging service: %w", err) | ||
} | ||
|
||
// Connect to the messaging service | ||
if err := messagingService.Connect(); err != nil { | ||
return fmt.Errorf("failed to connect to messaging service: %w", err) | ||
} | ||
defer func() { | ||
if err := messagingService.Disconnect(); err != nil { | ||
log.Printf("Error disconnecting from messaging service: %v", err) | ||
} | ||
}() | ||
|
||
// Test message publishing | ||
err = publishTestMessage(messagingService, topicName) | ||
if err != nil { | ||
return fmt.Errorf("failed to publish message: %w", err) | ||
} | ||
|
||
// Test message consumption from queue | ||
err = consumeTestMessage(messagingService, queueName) | ||
if err != nil { | ||
return fmt.Errorf("failed to consume message: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func publishTestMessage(messagingService solace.MessagingService, topicName string) error { | ||
// Build a direct message publisher | ||
directPublisher, err := messagingService.CreateDirectMessagePublisherBuilder().Build() | ||
if err != nil { | ||
return fmt.Errorf("failed to build publisher: %w", err) | ||
} | ||
|
||
// Start the publisher | ||
if err := directPublisher.Start(); err != nil { | ||
return fmt.Errorf("failed to start publisher: %w", err) | ||
} | ||
defer func() { | ||
if err := directPublisher.Terminate(1 * time.Second); err != nil { | ||
log.Printf("Error terminating direct publisher: %v", err) | ||
} | ||
}() | ||
|
||
// Create a message | ||
messageBuilder := messagingService.MessageBuilder() | ||
message, err := messageBuilder. | ||
WithProperty("custom-property", "test-value"). | ||
BuildWithStringPayload("Hello from Solace testcontainers!") | ||
if err != nil { | ||
return fmt.Errorf("failed to build message: %w", err) | ||
} | ||
|
||
// Create topic resource | ||
topic := resource.TopicOf(topicName) | ||
|
||
// Publish the message | ||
if err := directPublisher.Publish(message, topic); err != nil { | ||
return fmt.Errorf("failed to publish message: %w", err) | ||
} | ||
|
||
fmt.Printf("Published message to topic: %s\n", topicName) | ||
return nil | ||
} | ||
|
||
func consumeTestMessage(messagingService solace.MessagingService, queueName string) error { | ||
// Build a persistent message receiver | ||
persistentReceiver, err := messagingService.CreatePersistentMessageReceiverBuilder(). | ||
Build(resource.QueueDurableExclusive(queueName)) | ||
if err != nil { | ||
return fmt.Errorf("failed to build receiver: %w", err) | ||
} | ||
|
||
// Set up message handler | ||
messageReceived := make(chan message.InboundMessage, 1) | ||
errorChan := make(chan error, 1) | ||
|
||
messageHandler := func(msg message.InboundMessage) { | ||
payload, ok := msg.GetPayloadAsString() | ||
if ok { | ||
fmt.Printf("Received message: %s\n", payload) | ||
} | ||
messageReceived <- msg | ||
} | ||
|
||
// Start the receiver | ||
if err := persistentReceiver.Start(); err != nil { | ||
return fmt.Errorf("failed to start receiver: %w", err) | ||
} | ||
defer func() { | ||
if err := persistentReceiver.Terminate(1 * time.Second); err != nil { | ||
log.Printf("Error terminating persistent receiver: %v", err) | ||
} | ||
}() | ||
|
||
// Receive messages asynchronously | ||
if err := persistentReceiver.ReceiveAsync(messageHandler); err != nil { | ||
return fmt.Errorf("failed to start async receive: %w", err) | ||
} | ||
|
||
// Wait for message with timeout | ||
select { | ||
case <-messageReceived: | ||
fmt.Printf("Successfully received message from queue: %s\n", queueName) | ||
// For persistent messages, acknowledgment is typically handled automatically | ||
// or through the receiver's configuration | ||
return nil | ||
case err := <-errorChan: | ||
return fmt.Errorf("error receiving message: %w", err) | ||
case <-time.After(15 * time.Second): | ||
return fmt.Errorf("timeout waiting for message from queue: %s", queueName) | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.