Skip to content

Commit a0fc3cc

Browse files
committed
feat: added database/sql RunQueriesConcurrently function
1 parent 335a41e commit a0fc3cc

File tree

5 files changed

+70
-3
lines changed

5 files changed

+70
-3
lines changed
File renamed without changes.

sql/connection.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ func Connect(
4949
// Set the maximum idle connections
5050
db.SetMaxIdleConns(config.MaxIdleConnections)
5151

52+
db.Set
53+
5254
// Set the connection max lifetime
5355
db.SetConnMaxLifetime(config.ConnectionMaxLifetime)
5456

sql/queries.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package sql
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"sync"
7+
)
8+
9+
// RunQueriesConcurrently runs multiple queries concurrently
10+
func RunQueriesConcurrently(
11+
db *sql.DB,
12+
queries ...func(db *sql.DB, ctx context.Context) error,
13+
) *[]error {
14+
// Create a context with a cancellation function
15+
ctx, cancel := context.WithCancel(context.Background())
16+
defer cancel()
17+
18+
// Create a wait group
19+
var wg sync.WaitGroup
20+
wg.Add(len(queries))
21+
22+
// Create a channel to handle errors
23+
errCh := make(chan error)
24+
25+
// Execute the queries concurrently
26+
for _, query := range queries {
27+
go func(query func(db *sql.DB, ctx context.Context) error) {
28+
defer wg.Done()
29+
if err := query(db, ctx); err != nil {
30+
// Send the error to the channel
31+
errCh <- err
32+
33+
// Cancel the other queries
34+
cancel()
35+
}
36+
}(query)
37+
}
38+
39+
// Wait for all queries to complete
40+
wg.Wait()
41+
42+
// Close the error channel
43+
close(errCh)
44+
45+
// Return the errors if any
46+
var errors []error
47+
for err := range errCh {
48+
errors = append(errors, err)
49+
}
50+
51+
// Check if there are any errors
52+
if len(errors) == 0 {
53+
return nil
54+
}
55+
56+
return &errors
57+
}

sql/service/service.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package service
22

33
import (
4+
"context"
45
"database/sql"
56
godatabases "github.com/ralvarezdev/go-databases"
67
godatabasessql "github.com/ralvarezdev/go-databases/sql"
8+
godatabasessqltransaction "github.com/ralvarezdev/go-databases/sql/transaction"
79
"strings"
810
)
911

@@ -34,7 +36,7 @@ func NewDefaultService(db *sql.DB) (
3436
return nil, godatabases.ErrNilDatabase
3537
}
3638

37-
// Create the instance
39+
// CreateTransaction the instance
3840
instance = &DefaultService{
3941
db: db,
4042
}
@@ -70,6 +72,13 @@ func (d *DefaultService) RunTransaction(fn func(tx *sql.Tx) error) error {
7072
return godatabasessql.CreateTransaction(d.db, fn)
7173
}
7274

75+
// RunQueriesConcurrently runs queries concurrently
76+
func (d *DefaultService) RunQueriesConcurrently(
77+
queries ...func(db *sql.DB, ctx context.Context) error,
78+
) *[]error {
79+
return godatabasessql.RunQueriesConcurrently(d.db, queries...)
80+
}
81+
7382
// Exec executes a query with parameters and returns the result
7483
func (d *DefaultService) Exec(query *string, params ...interface{}) (
7584
sql.Result,

sql/transactions.go renamed to sql/transaction.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ func CreateTransaction(db *sql.DB, fn func(tx *sql.Tx) error) error {
1919
}
2020

2121
// Execute the transaction function
22-
fnErr := fn(tx)
23-
if fnErr != nil {
22+
if fnErr := fn(tx); fnErr != nil {
2423
err = tx.Rollback()
2524
if err != nil {
2625
return err

0 commit comments

Comments
 (0)