Skip to content

Add DynamoDB support #2090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions go.work
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
go 1.24.0

toolchain go1.24.0
go 1.24.4

use (
.
Expand All @@ -14,6 +12,7 @@ use (
./pkg/gofr/datasource/file/s3
./pkg/gofr/datasource/file/sftp
./pkg/gofr/datasource/kv-store/badger
./pkg/gofr/datasource/kv-store/dynamodb
./pkg/gofr/datasource/kv-store/nats
./pkg/gofr/datasource/mongo
./pkg/gofr/datasource/opentsdb
Expand Down
5 changes: 5 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ github.com/go-logfmt/logfmt v0.5.0 h1:TrB8swr/68K7m9CcGut2g3UOihhbcbiMAYiuTXdEih
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
Expand Down Expand Up @@ -1144,13 +1145,15 @@ go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q=
go.opentelemetry.io/otel v1.22.0/go.mod h1:eoV4iAi3Ea8LkAEI9+GFT44O6T/D0GWAVFyZVCC6pMI=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0 h1:WDdP9acbMYjbKIyJUhTvtzj601sVJOqgWdUxSdR/Ysc=
Expand All @@ -1163,6 +1166,7 @@ go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzau
go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs=
go.opentelemetry.io/otel/sdk v1.22.0/go.mod h1:iu7luyVGYovrRpe2fmj3CVKouQNdTOkxtLzPvPz1DOc=
go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok=
go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU=
Expand All @@ -1175,6 +1179,7 @@ go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06F
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
Expand Down
76 changes: 76 additions & 0 deletions pkg/gofr/datasource/kv-store/dynamodb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# DynamoDB Client

This package provides a DynamoDB client for use as a key-value store in GoFr applications.
It supports basic CRUD operations (Set, Get, Delete) on items using a partition key, along with health checks.
The client integrates logging, metrics, and tracing for observability.

## Quick Start
Import the package and create the client instance:
```Go
import (
"context"

"gofr.dev/pkg/gofr/datasource/kv/dynamodb" // Adjust path as needed
)

func main() {
configs := dynamodb.Configs{
Table: "your-table-name",
Region: "us-east-1",
Endpoint: "", // Leave empty for real AWS; set for local (e.g., "http://localhost:8000")
PartitionKeyName: "pk", // Default is "pk" if not specified
}

client := dynamodb.New(configs)
client.UseLogger(yourLogger) // Implement dynamodb.Logger interface
client.UseMetrics(yourMetrics) // Implement dynamodb.Metrics interface
// client.UseTracer(yourTracer) // Optional: trace.Tracer

if err := client.Connect(); err != nil {
// Handle connection error
}

ctx := context.Background()
key := "example-key"
attributes := map[string]any{"field": "value"}

// Set item
client.Set(ctx, key, attributes)

// Get item
result, _ := client.Get(ctx, key)

// Delete item
client.Delete(ctx, key)
}
```

## Configuration
The Configs struct defines the client settings:

* `Table`: Required. Name of the DynamoDB table.
* `Region`: Required. AWS region (e.g., "us-east-1").
* `Endpoint`: Optional. Custom endpoint URL (e.g., for local DynamoDB).
* `PartitionKeyName`: Optional. Partition key attribute name (defaults to "pk").

The table must have a string partition key (no sort key support).

## Usage

**Operations**
* `Set(ctx context.Context, key string, attributes map[string]any) error`: Stores attributes under the given key. Overwrites existing items.
* `Get(ctx context.Context, key string) (map[string]any, error)`: Retrieves attributes for the key. Returns error if key not found.
* `Delete(ctx context.Context, key string) error`: Deletes the item by key.
* `HealthCheck(ctx context.Context) (any, error)`: Checks table status via DescribeTable. Returns a `Health` struct with status ("UP" or "DOWN") and details.

All operations log details, record metrics (e.g., duration histograms), and support tracing spans.

## Observability
* Logging: Use `UseLogger` to inject a logger implementing the `Logger` interface. Logs details and errors.
* Metrics: Use `UseMetrics` to inject metrics implementing the `Metrics` interface. Records histograms for query durations.
* Tracing: Use `UseTracer` to inject an OpenTelemetry tracer. Adds spans for each operation.

## Notes
* This client assumes a simple key-value model; no support for sort keys, queries, or scans.
* For production, use IAM roles or credentials via AWS config.
* Ensure table exists before connecting; health check verifies accessibility.
255 changes: 255 additions & 0 deletions pkg/gofr/datasource/kv-store/dynamodb/dynamo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package dynamodb

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var errStatusDown = errors.New("status down")
var errKeyNotFound = errors.New("key not found")

type Configs struct {
Table string
Region string
Endpoint string
PartitionKeyName string
}
type dynamoDBInterface interface {
PutItem(
ctx context.Context,
params *dynamodb.PutItemInput,
optFns ...func(*dynamodb.Options),
) (*dynamodb.PutItemOutput, error)
GetItem(
ctx context.Context,
params *dynamodb.GetItemInput,
optFns ...func(*dynamodb.Options),
) (*dynamodb.GetItemOutput, error)
DeleteItem(
ctx context.Context,
params *dynamodb.DeleteItemInput,
optFns ...func(*dynamodb.Options),
) (*dynamodb.DeleteItemOutput, error)
DescribeTable(
ctx context.Context,
params *dynamodb.DescribeTableInput,
optFns ...func(*dynamodb.Options),
) (*dynamodb.DescribeTableOutput, error)
}

type Client struct {
db dynamoDBInterface
configs *Configs
logger Logger
metrics Metrics
tracer trace.Tracer
}

func New(configs Configs) *Client {
if configs.PartitionKeyName == "" {
configs.PartitionKeyName = "pk"
}

return &Client{configs: &configs}
}

// UseLogger sets the logger for the Dynamo client which asserts the Logger interface.
func (c *Client) UseLogger(logger any) {
if l, ok := logger.(Logger); ok {
c.logger = l
}
}

// UseMetrics sets the metrics for the Dynamo client which asserts the Metrics interface.
func (c *Client) UseMetrics(metrics any) {
if m, ok := metrics.(Metrics); ok {
c.metrics = m
}
}

// UseTracer sets the tracer for Dynamo client.
func (c *Client) UseTracer(tracer any) {
if tracer, ok := tracer.(trace.Tracer); ok {
c.tracer = tracer
}
}

func (c *Client) Connect() error {
c.logger.Debugf("connecting to DynamoDB table %v in region %v", c.configs.Table, c.configs.Region)

dynamoBuckets := []float64{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000}
c.metrics.NewHistogram("app_dynamodb_duration_ms", "Response time of DynamoDB queries in milliseconds.", dynamoBuckets...)

awsCfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(c.configs.Region))
if err != nil {
c.logger.Errorf("error loading AWS config: %v", err)
return fmt.Errorf("failed to load AWS config: %w", err)
}

var opts []func(*dynamodb.Options)

if c.configs.Endpoint != "" {
opts = append(opts, func(o *dynamodb.Options) {
o.BaseEndpoint = aws.String(c.configs.Endpoint)
})
}

db := dynamodb.NewFromConfig(awsCfg, opts...)
c.db = db

c.logger.Infof("connected to DynamoDB table %v in region %v", c.configs.Table, c.configs.Region)

return nil
}

func (c *Client) Get(ctx context.Context, key string) (map[string]any, error) {
span := c.addTrace(ctx, "get", key)
defer c.sendOperationsStats(time.Now(), "GET", span, key)

input := &dynamodb.GetItemInput{
TableName: aws.String(c.configs.Table),
Key: map[string]types.AttributeValue{
c.configs.PartitionKeyName: &types.AttributeValueMemberS{Value: key},
},
}

out, err := c.db.GetItem(ctx, input)
if err != nil {
c.logger.Errorf("error while fetching data for key: %v, error: %v", key, err)
return nil, err
}

if out.Item == nil {
return nil, errKeyNotFound
}

var result map[string]any
err = attributevalue.UnmarshalMap(out.Item, &result)

if err != nil {
c.logger.Errorf("error unmarshalling item for key: %v, error: %v", key, err)
return nil, err
}

delete(result, c.configs.PartitionKeyName)

return result, nil
}

func (c *Client) Set(ctx context.Context, key string, attributes map[string]any) error {
span := c.addTrace(ctx, "set", key)
defer c.sendOperationsStats(time.Now(), "SET", span, key)

itemAV, err := attributevalue.MarshalMap(attributes)
if err != nil {
c.logger.Errorf("error marshaling attributes for key: %v, error: %v", key, err)
return err
}

itemAV[c.configs.PartitionKeyName] = &types.AttributeValueMemberS{Value: key}
input := &dynamodb.PutItemInput{
TableName: aws.String(c.configs.Table),
Item: itemAV,
}

_, err = c.db.PutItem(ctx, input)
if err != nil {
c.logger.Errorf("error while setting data for key: %v, error: %v", key, err)
return err
}

return nil
}

func (c *Client) Delete(ctx context.Context, key string) error {
span := c.addTrace(ctx, "delete", key)
defer c.sendOperationsStats(time.Now(), "DELETE", span, key)

input := &dynamodb.DeleteItemInput{
TableName: aws.String(c.configs.Table),
Key: map[string]types.AttributeValue{
c.configs.PartitionKeyName: &types.AttributeValueMemberS{Value: key},
},
}

_, err := c.db.DeleteItem(ctx, input)

if err != nil {
c.logger.Errorf("error while deleting data for key: %v, error: %v", key, err)

return err
}

return nil
}

type Health struct {
Status string `json:"status,omitempty"`
Details map[string]any `json:"details,omitempty"`
}

func (c *Client) HealthCheck(ctx context.Context) (any, error) {
h := Health{
Details: make(map[string]any),
}

h.Details["table"] = c.configs.Table
h.Details["region"] = c.configs.Region

input := &dynamodb.DescribeTableInput{TableName: aws.String(c.configs.Table)}

_, err := c.db.DescribeTable(ctx, input)
if err != nil {
h.Status = "DOWN"

return &h, errStatusDown
}

h.Status = "UP"

return &h, nil
}

func (c *Client) sendOperationsStats(start time.Time, methodType string,
span trace.Span, kv ...string) {
duration := time.Since(start).Microseconds()

c.logger.Debug(&Log{
Type: methodType,
Duration: duration,
Key: strings.Join(kv, " "),
})

if span != nil {
defer span.End()
span.SetAttributes(attribute.Int64("dynamodb.duration_us", duration))
}

c.metrics.RecordHistogram(context.Background(), "app_dynamodb_duration_ms", float64(duration), "table", c.configs.Table,
"type", methodType)
}

func (c *Client) addTrace(ctx context.Context, method, key string) trace.Span {
if c.tracer != nil {
_, span := c.tracer.Start(ctx, fmt.Sprintf("dynamodb-%v", method))
span.SetAttributes(
attribute.String("dynamodb.method", method),
attribute.String("dynamodb.key", key),
)

return span
}

return nil
}
Loading