Skip to content

Feature: Add MSK IAM auth plugin with token refresh #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Dockerfile.all
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ RUN mkdir -p build && \
go build -mod=vendor -o build/kafka-proxy -ldflags "${LDFLAGS}" . && \
go build -mod=vendor -o build/auth-user -ldflags "${LDFLAGS}" cmd/plugin-auth-user/main.go && \
go build -mod=vendor -o build/auth-ldap -ldflags "${LDFLAGS}" cmd/plugin-auth-ldap/main.go && \
go build -mod=vendor -o build/aws-msk-iam-provider -ldflags "${LDFLAGS}" cmd/plugin-aws-msk-iam-provider/main.go && \
go build -mod=vendor -o build/google-id-provider -ldflags "${LDFLAGS}" cmd/plugin-googleid-provider/main.go && \
go build -mod=vendor -o build/google-id-info -ldflags "${LDFLAGS}" cmd/plugin-googleid-info/main.go && \
go build -mod=vendor -o build/unsecured-jwt-info -ldflags "${LDFLAGS}" cmd/plugin-unsecured-jwt-info/main.go && \
Expand All @@ -34,6 +35,7 @@ COPY --from=builder /go/src/github.com/grepplabs/kafka-proxy/build /opt/kafka-pr
RUN setcap 'cap_net_bind_service=+ep' /opt/kafka-proxy/bin/kafka-proxy && \
setcap 'cap_net_bind_service=+ep' /opt/kafka-proxy/bin/auth-user && \
setcap 'cap_net_bind_service=+ep' /opt/kafka-proxy/bin/auth-ldap && \
setcap 'cap_net_bind_service=+ep' /opt/kafka-proxy/bin/aws-msk-iam-provider && \
setcap 'cap_net_bind_service=+ep' /opt/kafka-proxy/bin/google-id-provider && \
setcap 'cap_net_bind_service=+ep' /opt/kafka-proxy/bin/google-id-info && \
setcap 'cap_net_bind_service=+ep' /opt/kafka-proxy/bin/unsecured-jwt-info && \
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ plugin.auth-user:
plugin.auth-ldap:
CGO_ENABLED=0 go build -o build/auth-ldap $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-auth-ldap/main.go

plugin.aws-msk-iam-provider:
CGO_ENABLED=0 go build -o build/aws-msk-iam-provider $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-aws-msk-iam-provider/main.go

plugin.google-id-provider:
CGO_ENABLED=0 go build -o build/google-id-provider $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-googleid-provider/main.go

Expand All @@ -114,7 +117,7 @@ plugin.unsecured-jwt-provider:
plugin.oidc-provider:
CGO_ENABLED=0 go build -o build/oidc-provider $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-oidc-provider/main.go

all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info plugin.unsecured-jwt-info plugin.unsecured-jwt-provider plugin.oidc-provider
all: build plugin.auth-user plugin.auth-ldap plugin.aws-msk-iam-provider plugin.google-id-provider plugin.google-id-info plugin.unsecured-jwt-info plugin.unsecured-jwt-provider plugin.oidc-provider

clean:
rm -rf $(ROOT_DIR)/build
106 changes: 106 additions & 0 deletions cmd/plugin-aws-msk-iam-provider/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# AWS MSK IAM Authentication Plugin

This plugin provides AWS MSK IAM authentication for the Kafka Proxy using SASL/OAUTHBEARER mechanism. It automatically handles token refresh to ensure authentication doesn't expire.

## Features

- **Multiple Authentication Methods**: Supports default credentials, named profiles, and IAM role assumption
- **Automatic Token Refresh**: Tokens are automatically refreshed 5 minutes before expiry
- **Role Assumption**: Supports assuming IAM roles with optional external ID
- **Error Handling**: Robust error handling with exponential backoff for token generation

## Configuration Options

| Option | Required | Description | Default |
|--------|----------|-------------|---------|
| `--region` | Yes | AWS region where MSK cluster is located | - |
| `--profile` | No | AWS named profile to use for authentication | - |
| `--role-arn` | No | IAM role ARN to assume | - |
| `--external-id` | No | External ID for role assumption | - |
| `--session-name` | No | Session name for role assumption | `kafka-proxy` |
| `--timeout` | No | Request timeout in seconds | `10` |

## Authentication Methods

### 1. Default Credentials
Uses the default AWS credential provider chain (environment variables, IAM roles, etc.).

```bash
./plugin-aws-msk-iam-provider --region us-east-1
```

### 2. Named Profile
Uses a specific AWS named profile.

```bash
./plugin-aws-msk-iam-provider --region us-east-1 --profile my-profile
```

### 3. IAM Role Assumption
Assumes an IAM role for authentication.

```bash
./plugin-aws-msk-iam-provider --region us-east-1 --role-arn arn:aws:iam::123456789012:role/MyRole --session-name kafka-proxy
```

### 4. IAM Role with External ID
Assumes an IAM role with external ID for enhanced security.

```bash
./plugin-aws-msk-iam-provider --region us-east-1 --role-arn arn:aws:iam::123456789012:role/MyRole --external-id my-external-id --session-name kafka-proxy
```

## Integration with Kafka Proxy

To use this plugin with the Kafka Proxy, configure it in your proxy configuration:

```yaml
token-provider:
plugin: aws-msk-iam-provider
args:
- "--region=us-east-1"
- "--profile=my-profile"
# or for role assumption:
# - "--role-arn=arn:aws:iam::123456789012:role/MyRole"
# - "--session-name=kafka-proxy"
```

## Token Refresh

The plugin automatically refreshes tokens 5 minutes before they expire. AWS MSK IAM tokens are typically valid for 15 minutes, and the plugin sets an internal expiry of 14 minutes to ensure safe refresh timing.

## Error Handling

- **Initial Token Generation**: Uses exponential backoff with up to 3 retries
- **Token Refresh**: Uses exponential backoff with up to 60 seconds max elapsed time
- **Graceful Degradation**: Continues operation even if refresh fails, will retry on next request

## Dependencies

This plugin uses the [AWS MSK IAM SASL Signer for Go](https://github.com/aws/aws-msk-iam-sasl-signer-go) library to generate authentication tokens.

## Troubleshooting

### Common Issues

1. **Access Denied**: Ensure the IAM user/role has the necessary permissions for MSK cluster access
2. **Region Mismatch**: Verify the region matches your MSK cluster location
3. **Profile Not Found**: Check that the AWS profile exists in your credentials file
4. **Role Assumption Failed**: Verify the role ARN and ensure your credentials can assume the role

### Debug Mode

To enable debug logging for AWS credentials, set the environment variable:

```bash
export AWS_DEBUG_CREDS=true
```

This will log the IAM identity being used for authentication.

## Security Considerations

- **External ID**: Use external ID when assuming roles for enhanced security
- **Session Names**: Use descriptive session names for better audit trails
- **Credentials**: Ensure credentials are properly secured and rotated regularly
- **Network**: Use TLS encryption for all communications with MSK clusters
35 changes: 35 additions & 0 deletions cmd/plugin-aws-msk-iam-provider/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"os"

awsmskiamprovider "github.com/grepplabs/kafka-proxy/pkg/libs/aws-msk-iam-provider"
"github.com/grepplabs/kafka-proxy/plugin/token-provider/shared"
"github.com/hashicorp/go-plugin"
"github.com/sirupsen/logrus"
)

func main() {
// Create the AWS MSK IAM token signer
signer := awsmskiamprovider.NewAwsMskIamTokenSigner()

// Create the factory with the signer
factory := awsmskiamprovider.NewFactory(signer)

// Create the token provider
tokenProvider, err := factory.New(os.Args[1:])

if err != nil {
logrus.Errorf("cannot initialize aws-msk-iam-token provider: %v", err)
os.Exit(1)
}

plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: shared.Handshake,
Plugins: map[string]plugin.Plugin{
"tokenProvider": &shared.TokenProviderPlugin{Impl: tokenProvider},
},
// A non-nil value here enables gRPC serving for this plugin...
GRPCServer: plugin.DefaultGRPCServer,
})
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/grepplabs/kafka-proxy

go 1.23
go 1.23.0

require (
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
Expand Down Expand Up @@ -39,6 +39,7 @@ require (
require (
cloud.google.com/go/compute/metadata v0.5.2 // indirect
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 h1:2jAwFwA0Xgcx94dUId+K24yFabsKYDtAhCgyMit6OqE=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI=
github.com/aws/aws-sdk-go-v2 v1.36.1 h1:iTDl5U6oAhkNPba0e1t1hrwAo02ZMqbrGq4k5JBWM5E=
github.com/aws/aws-sdk-go-v2 v1.36.1/go.mod h1:5PMILGVKiW32oDzjj6RU52yrNrDPUHcbZQYr1sM7qmM=
github.com/aws/aws-sdk-go-v2/config v1.29.6 h1:fqgqEKK5HaZVWLQoLiC9Q+xDlSp+1LYidp6ybGE2OGg=
Expand Down
71 changes: 71 additions & 0 deletions pkg/libs/aws-msk-iam-provider/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package awsmskiamprovider

import (
"flag"

"github.com/grepplabs/kafka-proxy/pkg/apis"
"github.com/grepplabs/kafka-proxy/pkg/registry"
)

func init() {
registry.NewComponentInterface(new(apis.TokenProviderFactory))
registry.Register(new(Factory), "aws-msk-iam-provider")
}

func (f *pluginMeta) flagSet() *flag.FlagSet {
fs := flag.NewFlagSet("aws msk iam provider settings", flag.ContinueOnError)
return fs
}

type pluginMeta struct {
region string
profile string
roleARN string
externalID string
sessionName string
timeout int
}

// Factory type
type Factory struct {
signer TokenSigner
}

// NewFactory creates a new factory with the given signer
func NewFactory(signer TokenSigner) *Factory {
return &Factory{
signer: signer,
}
}

// New implements apis.TokenProviderFactory
func (t *Factory) New(params []string) (apis.TokenProvider, error) {
pluginMeta := &pluginMeta{}
fs := pluginMeta.flagSet()
fs.StringVar(&pluginMeta.region, "region", "", "AWS region (required)")
fs.StringVar(&pluginMeta.profile, "profile", "", "AWS named profile (optional)")
fs.StringVar(&pluginMeta.roleARN, "role-arn", "", "IAM role ARN to assume (optional)")
fs.StringVar(&pluginMeta.externalID, "external-id", "", "External ID for role assumption (optional)")
fs.StringVar(&pluginMeta.sessionName, "session-name", "kafka-proxy", "Session name for role assumption")
fs.IntVar(&pluginMeta.timeout, "timeout", 10, "Request timeout in seconds")

err := fs.Parse(params)
if err != nil {
return nil, err
}

if pluginMeta.region == "" {
return nil, flag.ErrHelp
}

options := TokenProviderOptions{
Region: pluginMeta.region,
Profile: pluginMeta.profile,
RoleARN: pluginMeta.roleARN,
ExternalID: pluginMeta.externalID,
SessionName: pluginMeta.sessionName,
Timeout: pluginMeta.timeout,
}

return NewTokenProvider(options, t.signer)
}
60 changes: 60 additions & 0 deletions pkg/libs/aws-msk-iam-provider/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package awsmskiamprovider

import (
"testing"

"github.com/grepplabs/kafka-proxy/pkg/apis"
"github.com/stretchr/testify/assert"
)

func TestFactory_New(t *testing.T) {
tests := []struct {
name string
params []string
wantErr bool
errorSubstr string
}{
{
name: "missing region",
params: []string{},
wantErr: true,
errorSubstr: "help requested",
},
{
name: "invalid flag",
params: []string{"--invalid-flag"},
wantErr: true,
errorSubstr: "flag provided but not defined",
},
{
name: "happy path",
params: []string{"--region", "us-east-1"},
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockSigner := NewMockTokenSigner()
factory := NewFactory(mockSigner)
provider, err := factory.New(tt.params)

if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, provider)
if tt.errorSubstr != "" {
assert.Contains(t, err.Error(), tt.errorSubstr)
}
} else {
assert.NoError(t, err)
assert.NotNil(t, provider)
}
})
}
}

func TestFactory_ImplementsInterface(t *testing.T) {
mockSigner := NewMockTokenSigner()
factory := NewFactory(mockSigner)
assert.Implements(t, (*apis.TokenProviderFactory)(nil), factory)
}
Loading