@@ -5,16 +5,28 @@ import (
5
5
"fmt"
6
6
"log"
7
7
"math"
8
+ "reflect"
9
+ "sort"
8
10
"sync"
9
11
"time"
10
12
13
+ "github.com/mitchellh/mapstructure"
14
+
11
15
"github.com/AppsFlyer/go-consul-resolver/lb"
12
16
"github.com/cenkalti/backoff/v4"
13
17
"github.com/friendsofgo/errors"
14
18
"github.com/hashicorp/consul/api"
15
19
"go.uber.org/ratelimit"
16
20
)
17
21
22
+ type agentConfig struct {
23
+ DC string `mapstructure:"Datacenter"`
24
+ }
25
+
26
+ type agentSelf struct {
27
+ Config agentConfig `mapstructure:"Config"`
28
+ }
29
+
18
30
// Balancer interface provides methods for selecting a target and updating its state
19
31
type Balancer interface {
20
32
// Select returns a *api.ServiceEntry describing the selected target.
@@ -31,14 +43,16 @@ type ServiceProvider interface {
31
43
}
32
44
33
45
type ServiceResolver struct {
34
- log LogFn
35
- ctx context.Context
36
- client ServiceProvider
37
- queryOpts * api.QueryOptions
38
- balancer Balancer
39
- spec ServiceSpec
40
- init chan struct {}
41
- initDone sync.Once
46
+ log LogFn
47
+ ctx context.Context
48
+ client ServiceProvider
49
+ queryOpts * api.QueryOptions
50
+ balancer Balancer
51
+ spec ServiceSpec
52
+ prioritizedInstances [][]* api.ServiceEntry
53
+ mu sync.Mutex
54
+ init chan struct {}
55
+ initDone sync.Once
42
56
}
43
57
44
58
// NewConsulResolver creates a new Consul Resolver
@@ -69,18 +83,40 @@ func NewConsulResolver(ctx context.Context, conf ResolverConfig) (*ServiceResolv
69
83
conf .Log = log .Printf
70
84
}
71
85
86
+ datacenters := []string {"" }
87
+ if len (conf .FallbackDatacenters ) > 0 {
88
+ seen := map [string ]struct {}{}
89
+ // Exclude the local datacenter from the list of fallback datacenters
90
+ localDC , err := getLocalDatacenter (conf .Client .Agent ())
91
+ if err != nil {
92
+ return nil , errors .Wrap (err , "failed determining local consul datacenter" )
93
+ }
94
+
95
+ for _ , dc := range conf .FallbackDatacenters {
96
+ if _ , ok := seen [dc ]; ok || dc == localDC {
97
+ continue
98
+ }
99
+ seen [dc ] = struct {}{}
100
+ datacenters = append (datacenters , dc )
101
+ }
102
+ }
103
+
72
104
resolver := & ServiceResolver {
73
- log : conf .Log ,
74
- ctx : ctx ,
75
- queryOpts : conf .Query ,
76
- spec : conf .ServiceSpec ,
77
- client : conf .Client .Health (),
78
- balancer : conf .Balancer ,
79
- init : make (chan struct {}),
80
- initDone : sync.Once {},
105
+ log : conf .Log ,
106
+ ctx : ctx ,
107
+ queryOpts : conf .Query ,
108
+ spec : conf .ServiceSpec ,
109
+ client : conf .Client .Health (),
110
+ balancer : conf .Balancer ,
111
+ prioritizedInstances : make ([][]* api.ServiceEntry , len (datacenters )),
112
+ init : make (chan struct {}),
113
+ initDone : sync.Once {},
81
114
}
82
115
83
- go resolver .populateFromConsul ()
116
+ // Always prepend the local datacenter with the highest priority
117
+ for priority , dc := range datacenters {
118
+ go resolver .populateFromConsul (dc , priority )
119
+ }
84
120
85
121
return resolver , nil
86
122
}
@@ -127,13 +163,16 @@ func (r *ServiceResolver) Resolve(ctx context.Context) (ServiceAddress, error) {
127
163
return ServiceAddress {Host : host , Port : port }, nil
128
164
}
129
165
130
- func (r * ServiceResolver ) populateFromConsul () {
166
+ func (r * ServiceResolver ) populateFromConsul (dcName string , dcPriority int ) {
131
167
rl := ratelimit .New (1 ) // limit consul queries to 1 per second
132
168
bck := backoff .NewExponentialBackOff ()
133
169
bck .MaxElapsedTime = 0
134
170
bck .MaxInterval = time .Second * 30
135
171
136
- r .queryOpts .WaitIndex = 0
172
+ q := * r .queryOpts
173
+
174
+ q .WaitIndex = 0
175
+ q .Datacenter = dcName
137
176
for r .ctx .Err () == nil {
138
177
rl .Take ()
139
178
err := backoff .RetryNotify (
@@ -142,18 +181,21 @@ func (r *ServiceResolver) populateFromConsul() {
142
181
r .spec .ServiceName ,
143
182
r .spec .Tags ,
144
183
! r .spec .IncludeUnhealthy ,
145
- r . queryOpts ,
184
+ & q ,
146
185
)
147
186
if err != nil {
148
187
return err
149
188
}
150
- if meta .LastIndex < r . queryOpts .WaitIndex {
151
- r . queryOpts .WaitIndex = 0
189
+ if meta .LastIndex < q .WaitIndex {
190
+ q .WaitIndex = 0
152
191
} else {
153
- r .queryOpts .WaitIndex = uint64 (math .Max (float64 (1 ), float64 (meta .LastIndex )))
192
+ q .WaitIndex = uint64 (math .Max (float64 (1 ), float64 (meta .LastIndex )))
193
+ }
194
+
195
+ if targets , shouldUpdate := r .getTargetsForUpdate (se , dcPriority ); shouldUpdate {
196
+ r .balancer .UpdateTargets (targets )
154
197
}
155
198
156
- r .balancer .UpdateTargets (se )
157
199
r .initDone .Do (func () {
158
200
close (r .init )
159
201
})
@@ -170,3 +212,54 @@ func (r *ServiceResolver) populateFromConsul() {
170
212
}
171
213
r .log ("[Consul Resolver] context canceled, stopping consul watcher" )
172
214
}
215
+
216
+ // getTargetsForUpdate will update the LB only if:
217
+ // - The DC has healthy nodes
218
+ // - No DC with higher priority has healthy nodes
219
+ func (r * ServiceResolver ) getTargetsForUpdate (se []* api.ServiceEntry , priority int ) ([]* api.ServiceEntry , bool ) {
220
+ sort .SliceStable (se , func (i , j int ) bool {
221
+ return se [i ].Node .ID < se [j ].Node .ID
222
+ })
223
+
224
+ r .mu .Lock ()
225
+ defer r .mu .Unlock ()
226
+
227
+ var found bool
228
+ // check if the target list is unchanged
229
+ if reflect .DeepEqual (se , r .prioritizedInstances [priority ]) {
230
+ return nil , false
231
+ }
232
+ r .prioritizedInstances [priority ] = se
233
+ for i := 0 ; i <= len (r .prioritizedInstances )- 1 ; i ++ {
234
+ if len (r .prioritizedInstances [i ]) == 0 {
235
+ continue
236
+ }
237
+ found = true
238
+ if priority > i {
239
+ break
240
+ }
241
+
242
+ return r .prioritizedInstances [i ], true
243
+ }
244
+
245
+ // If no DC has any nodes, return an empty slice and signal the caller that an update is needed
246
+ if ! found {
247
+ return se , true
248
+ }
249
+
250
+ return se , false
251
+ }
252
+
253
+ func getLocalDatacenter (c * api.Agent ) (string , error ) {
254
+ res , err := c .Self ()
255
+ if err != nil {
256
+ return "" , errors .Wrap (err , "failed querying agent" )
257
+ }
258
+
259
+ var self agentSelf
260
+ if err := mapstructure .Decode (res , & self ); err != nil {
261
+ return "" , errors .Wrap (err , "failed decoding agent configuration" )
262
+ }
263
+
264
+ return self .Config .DC , nil
265
+ }
0 commit comments