Skip to content

Commit 352764d

Browse files
authored
Merge pull request #2 from randlabs/httpclient
Added a load-balanced http client library code.
2 parents bc7c1b3 + 081a4cb commit 352764d

File tree

12 files changed

+1075
-23
lines changed

12 files changed

+1075
-23
lines changed

README.md

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
A round-robin server selection (aka load balancer) library.
44

5-
**IMPORTANT NOTE**: This library DOES NOT DO any kind of networking. It aims to automatically select an upstream handler in a set of primary and backup configured servers.
5+
The base code of this library, the balancer, *DOES NOT DO* any kind of network access. It's goal is to automatically select an upstream handler in a set of primary and backup configured servers.
66

77
## Usage with example
88

@@ -81,5 +81,61 @@ func main() {
8181
}
8282
```
8383

84+
## httpclient
85+
86+
The `httpclient` module, implements an alternative to `http.Client` that allows to use a set of servers and balance
87+
requests among them.
88+
89+
> Most load-balanced http client libraries makes use of the `RoundTripper` interface, but we don't.
90+
>
91+
> The major reason for this is we want to allow the dev, to be able to mark a server (temporarily) offline or retry
92+
> the operation, not only if connection to the server is established, but also depending on the response.
93+
>
94+
> For e.g., let's say your backend correctly answers a request but the output indicates the internal processing is not
95+
> up-to-date, then you can decide to stop using that server until it is.
96+
97+
### Usage:
98+
99+
```golang
100+
import (
101+
"fmt"
102+
103+
balancer "github.com/randlabs/go-loadbalancer"
104+
)
105+
106+
func main() {
107+
hc := httpclient.Create()
108+
_ = hc.AddSource("https://server1.test-network", httpclient.SourceOptions{
109+
ServerOptions: httpclient.ServerOptions{
110+
Weight: 1,
111+
MaxFails: 1,
112+
FailTimeout: 10 * time.Second,
113+
},
114+
})
115+
_ = hc.AddSource("https://server2.test-network", httpclient.SourceOptions{
116+
ServerOptions: httpclient.ServerOptions{
117+
Weight: 1,
118+
MaxFails: 1,
119+
FailTimeout: 10 * time.Second,
120+
},
121+
})
122+
123+
req := hc.NewRequest("GET", "/api-test")
124+
err := req.Exec(context.Background(), func (ctx context.Context, res httpclient.Response) error {
125+
if res.Err() != nil || res.StatusCode != 200 {
126+
// Retry on the next available server on failed request
127+
res.RetryOnNextServer()
128+
return nil
129+
}
130+
131+
// Process response
132+
// ...
133+
134+
// Done
135+
return nil
136+
})
137+
}
138+
```
139+
84140
## License
85141
See `LICENSE` file for details.

httpclient/error.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package httpclient
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"net"
7+
)
8+
9+
// -----------------------------------------------------------------------------
10+
11+
const (
12+
errorTypeIsTimeout = 1
13+
errorTypeIsCanceled = 2
14+
)
15+
16+
// -----------------------------------------------------------------------------
17+
18+
// Error is the error type usually returned by us.
19+
type Error struct {
20+
message string
21+
url string
22+
statusCode int
23+
errType int
24+
// err is the underlying error that occurred during the operation.
25+
err error
26+
}
27+
28+
// -----------------------------------------------------------------------------
29+
30+
func (c *HttpClient) newError(wrappedErr error, message string, url string, statusCode int) *Error {
31+
err := Error{
32+
message: message,
33+
url: url,
34+
statusCode: statusCode,
35+
err: wrappedErr,
36+
}
37+
return &err
38+
}
39+
40+
// -----------------------------------------------------------------------------
41+
42+
func (e *Error) URL() string {
43+
return e.url
44+
}
45+
46+
func (e *Error) StatusCode() int {
47+
return e.statusCode
48+
}
49+
50+
func (e *Error) Unwrap() error {
51+
return e.err
52+
}
53+
54+
func (e *Error) Error() string {
55+
if e == nil {
56+
return "<nil>"
57+
}
58+
s := e.message + fmt.Sprintf(" [URL=%v]", e.url)
59+
if e.err != nil {
60+
s += " [err=" + e.err.Error() + "]"
61+
}
62+
return s
63+
}
64+
65+
func (e *Error) IsTimeout() bool {
66+
return e.errType == errorTypeIsTimeout
67+
}
68+
69+
func (e *Error) IsCanceled() bool {
70+
return e.errType == errorTypeIsCanceled
71+
}
72+
73+
func (e *Error) IsNetworkError() bool {
74+
if e.err != nil {
75+
var netErr net.Error
76+
var netOpErr *net.OpError
77+
var netDnsErr *net.DNSError
78+
79+
if errors.As(e.err, &netErr) || errors.As(e.err, &netOpErr) || errors.As(e.err, &netDnsErr) {
80+
return true
81+
}
82+
}
83+
return false
84+
}

httpclient/exec.go

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package httpclient
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"errors"
7+
"io"
8+
"net"
9+
"net/http"
10+
"strings"
11+
"time"
12+
)
13+
14+
// -----------------------------------------------------------------------------
15+
16+
const (
17+
defaultTimeout = 20 * time.Second
18+
19+
errUnableToExecuteRequest = "failed to execute http request"
20+
errNoAvailableServer = "no available upstream server"
21+
)
22+
23+
// -----------------------------------------------------------------------------
24+
25+
func (c *HttpClient) exec(parentCtx context.Context, cb ExecCallback, req *Request) error {
26+
var httpreq *http.Request
27+
var getBody func() io.ReadCloser
28+
var err error
29+
30+
// Define a body getter to return multiple copies of the reader to be used in retries.
31+
if req.Body == nil {
32+
// If no body, getter will return nil
33+
getBody = func() io.ReadCloser {
34+
return nil
35+
}
36+
} else {
37+
// Convert to a ReadCloser if just a reader
38+
rc, ok := req.Body.(io.ReadCloser)
39+
if !ok {
40+
rc = io.NopCloser(req.Body)
41+
}
42+
43+
// Defer close of the original body
44+
defer func() {
45+
_ = rc.Close()
46+
}()
47+
48+
// Set up a body reader cloning function
49+
switch v := req.Body.(type) {
50+
case *bytes.Buffer:
51+
buf := v.Bytes()
52+
getBody = func() io.ReadCloser {
53+
r := bytes.NewReader(buf)
54+
return io.NopCloser(r)
55+
}
56+
57+
case *bytes.Reader:
58+
snapshot := *v
59+
getBody = func() io.ReadCloser {
60+
r := snapshot
61+
return io.NopCloser(&r)
62+
}
63+
64+
case *strings.Reader:
65+
snapshot := *v
66+
getBody = func() io.ReadCloser {
67+
r := snapshot
68+
return io.NopCloser(&r)
69+
}
70+
71+
default:
72+
return errors.New("unsupported body reader")
73+
}
74+
}
75+
76+
// Initialize retry counter
77+
retryCounter := 0
78+
79+
// Loop
80+
for {
81+
// Get next available server
82+
srv := c.lb.Next()
83+
if srv == nil {
84+
return c.newError(nil, errNoAvailableServer, req.Resource, 0)
85+
}
86+
87+
src := srv.UserData().(*Source)
88+
89+
// Create the final url
90+
url := src.baseURL + req.Resource
91+
92+
// Create a new http request
93+
httpreq, err = http.NewRequest(req.Method, url, getBody())
94+
if err != nil {
95+
err = c.newError(err, errUnableToExecuteRequest, url, 0)
96+
src.setLastError(err)
97+
return err
98+
}
99+
100+
// Add load balancer source headers
101+
httpreq.Header = src.header.Clone()
102+
103+
// Add request headers
104+
for k, v := range req.Header {
105+
vLen := len(v)
106+
if vLen > 0 {
107+
httpreq.Header.Set(k, v[0])
108+
for vIdx := 1; vIdx < vLen; vIdx++ {
109+
httpreq.Header.Add(k, v[vIdx])
110+
}
111+
}
112+
}
113+
114+
// Create http client requester
115+
client := http.Client{
116+
Transport: c.transport,
117+
}
118+
119+
// Establish a new context with a default timeout if a deadline is not present
120+
ctx := parentCtx
121+
var cancelCtx context.CancelFunc
122+
if _, hasDeadline := parentCtx.Deadline(); !hasDeadline {
123+
ctx, cancelCtx = context.WithTimeout(parentCtx, defaultTimeout)
124+
}
125+
126+
// Build callback info
127+
upstreamOffline := false
128+
retry := false
129+
execResult := Response{
130+
fullUrl: url,
131+
source: src,
132+
retryCount: retryCounter,
133+
upstreamOffline: &upstreamOffline,
134+
retry: &retry,
135+
}
136+
137+
// Execute real request
138+
execResult.Response, err = client.Do(httpreq.WithContext(ctx))
139+
if err != nil {
140+
var netErr net.Error
141+
142+
if errors.Is(err, context.DeadlineExceeded) {
143+
// Deadline exceeded?
144+
if cancelCtx != nil {
145+
cancelCtx() // To avoid defer calling inside a for loop and warnings, we call it here
146+
}
147+
return ErrTimeout
148+
}
149+
if errors.As(err, &netErr); netErr.Timeout() {
150+
// Network timeout?
151+
srv.SetOffline()
152+
153+
err = ErrTimeout
154+
} else if errors.Is(err, context.Canceled) {
155+
// Canceled?
156+
if cancelCtx != nil {
157+
cancelCtx() // To avoid defer calling inside a for loop and warnings, we call it here
158+
}
159+
return ErrCanceled
160+
} else {
161+
// Other type of error
162+
srv.SetOffline()
163+
164+
err = c.newError(err, errUnableToExecuteRequest, url, 0)
165+
}
166+
}
167+
168+
// Set error in callback
169+
execResult.err = err
170+
171+
// Call the callback
172+
err = cb(parentCtx, execResult)
173+
174+
// To avoid defer calling inside a for loop and warnings, we call it here
175+
if cancelCtx != nil {
176+
cancelCtx()
177+
}
178+
179+
// Close the response body if one exist
180+
if execResult.Response != nil {
181+
_ = execResult.Response.Body.Close()
182+
}
183+
184+
// Set the last error (even success)
185+
src.setLastError(err)
186+
187+
// Raise callbak
188+
c.raiseRequestEvent(srv, err)
189+
190+
// Set server online/offline based on the callback response
191+
if !upstreamOffline {
192+
srv.SetOnline()
193+
} else {
194+
srv.SetOffline()
195+
}
196+
197+
// Should we retry on next server?
198+
if !retry {
199+
break
200+
}
201+
202+
// Increment retry counter
203+
retryCounter += 1
204+
}
205+
206+
// Done
207+
return err
208+
}

0 commit comments

Comments
 (0)