Skip to content

Commit 31d7464

Browse files
authored
chore(app,outbound)!: Decouple metrics registry from stack building (#2887)
As we introduced the newer prometheus-client metrics registry, we did so by allowing stacks to register metrics directly. This is incongruent with the existing legacy metrics registries, which are known by the Inbound and Outbound stack builders. This leads to two problems: 1. We cannot build the admin server until all of the proxy stacks are built. This ordering dependency is unnecessary and cumbersome if we want to insert additional discovery work into the initialization process. 2. It is cumbersome to add new metrics to stacks, as the registry must be passed through the stack building process. To fix this, this change introduces additional Metrics types so that these metrics may be registered along with the other stack metrics. BREAKING CHANGE: Gateway-mode proxies now report Balancer metrics with the "outbound_" prefix instead of the "gateway_" prefix. This metrics scope was introduced very recently and is incongruent with our other metrics export. We have no known readers of these metrics, as they are only relevant to new load balancer behavior, and only on multicluster gateways. Unifying under the outbound_ prefix reduces noise in metrics in non-gateway use cases and makes it easier to query proxy data consistently. This is the only functional change in this commit.
1 parent 3c8012c commit 31d7464

File tree

27 files changed

+213
-179
lines changed

27 files changed

+213
-179
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2255,6 +2255,7 @@ dependencies = [
22552255
"jemallocator",
22562256
"linkerd-app",
22572257
"linkerd-meshtls",
2258+
"linkerd-metrics",
22582259
"linkerd-signal",
22592260
"num_cpus",
22602261
"tokio",

linkerd/app/core/src/control.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,30 @@ impl fmt::Display for ControlAddr {
7272
pub type RspBody =
7373
linkerd_http_metrics::requests::ResponseBody<http::balance::Body<hyper::Body>, classify::Eos>;
7474

75+
#[derive(Clone, Debug, Default)]
76+
pub struct Metrics {
77+
balance: balance::Metrics,
78+
}
79+
7580
const EWMA_CONFIG: http::balance::EwmaConfig = http::balance::EwmaConfig {
7681
default_rtt: time::Duration::from_millis(30),
7782
decay: time::Duration::from_secs(10),
7883
};
7984

85+
impl Metrics {
86+
pub fn register(registry: &mut prom::Registry) -> Self {
87+
Metrics {
88+
balance: balance::Metrics::register(registry.sub_registry_with_prefix("balancer")),
89+
}
90+
}
91+
}
92+
8093
impl Config {
8194
pub fn build(
8295
self,
8396
dns: dns::Resolver,
84-
metrics: metrics::ControlHttp,
85-
registry: &mut prom::Registry,
97+
legacy_metrics: metrics::ControlHttp,
98+
metrics: Metrics,
8699
identity: identity::NewClient,
87100
) -> svc::ArcNewService<
88101
(),
@@ -133,12 +146,8 @@ impl Config {
133146

134147
let balance = endpoint
135148
.lift_new()
136-
.push(self::balance::layer(
137-
registry.sub_registry_with_prefix("balancer"),
138-
dns,
139-
resolve_backoff,
140-
))
141-
.push(metrics.to_layer::<classify::Response, _, _>())
149+
.push(self::balance::layer(metrics.balance, dns, resolve_backoff))
150+
.push(legacy_metrics.to_layer::<classify::Response, _, _>())
142151
.push(classify::NewClassify::layer_default());
143152

144153
balance
@@ -233,25 +242,30 @@ mod balance {
233242
use super::{client::Target, ControlAddr};
234243
use crate::{
235244
dns,
236-
metrics::prom::{self, encoding::EncodeLabelSet},
245+
metrics::prom::encoding::EncodeLabelSet,
237246
proxy::{dns_resolve::DnsResolve, http, resolve::recover},
238247
svc, tls,
239248
};
240249
use linkerd_stack::ExtractParam;
241250
use std::net::SocketAddr;
242251

252+
pub(super) type Metrics = http::balance::MetricFamilies<Labels>;
253+
243254
pub fn layer<B, R: Clone, N>(
244-
registry: &mut prom::Registry,
255+
metrics: Metrics,
245256
dns: dns::Resolver,
246257
recover: R,
247258
) -> impl svc::Layer<
248259
N,
249260
Service = http::NewBalance<B, Params, recover::Resolve<R, DnsResolve>, NewIntoTarget<N>>,
250261
> {
251262
let resolve = recover::Resolve::new(recover, DnsResolve::new(dns));
252-
let metrics = Params(http::balance::MetricFamilies::register(registry));
253263
svc::layer::mk(move |inner| {
254-
http::NewBalance::new(NewIntoTarget { inner }, resolve.clone(), metrics.clone())
264+
http::NewBalance::new(
265+
NewIntoTarget { inner },
266+
resolve.clone(),
267+
Params(metrics.clone()),
268+
)
255269
})
256270
}
257271

@@ -270,7 +284,7 @@ mod balance {
270284
}
271285

272286
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
273-
struct Labels {
287+
pub(super) struct Labels {
274288
addr: String,
275289
}
276290

linkerd/app/gateway/src/http.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::Gateway;
22
use inbound::{GatewayAddr, GatewayDomainInvalid};
33
use linkerd_app_core::{
4-
metrics::{prom, ServerLabel},
4+
metrics::ServerLabel,
55
profiles,
66
proxy::{
77
api_resolve::{ConcreteAddr, Metadata},
@@ -47,7 +47,6 @@ impl Gateway {
4747
/// outbound router.
4848
pub fn http<T, R>(
4949
&self,
50-
registry: &mut prom::Registry,
5150
inner: svc::ArcNewHttp<
5251
outbound::http::concrete::Endpoint<
5352
outbound::http::logical::Concrete<outbound::http::Http<Target>>,
@@ -86,7 +85,7 @@ impl Gateway {
8685
.outbound
8786
.clone()
8887
.with_stack(inner)
89-
.push_http_cached(outbound::http::HttpMetrics::register(registry), resolve)
88+
.push_http_cached(resolve)
9089
.into_stack()
9190
// Discard `T` and its associated client-specific metadata.
9291
.push_map_target(Target::discard_parent)

linkerd/app/gateway/src/http/tests.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ async fn upgraded_request_remains_relative_form() {
160160
);
161161
gateway
162162
.http(
163-
&mut Default::default(),
164163
svc::ArcNewHttp::new(move |_: _| svc::BoxHttp::new(inner.clone())),
165164
resolve,
166165
)

linkerd/app/gateway/src/lib.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
#![forbid(unsafe_code)]
44

55
use linkerd_app_core::{
6-
io,
7-
metrics::prom,
8-
profiles,
6+
io, profiles,
97
proxy::{
108
api_resolve::{ConcreteAddr, Metadata},
119
core::Resolve,
@@ -50,7 +48,6 @@ impl Gateway {
5048
/// stack.
5149
pub fn stack<T, I, R>(
5250
self,
53-
registry: &mut prom::Registry,
5451
resolve: R,
5552
profiles: impl profiles::GetProfile<Error = Error>,
5653
policies: impl outbound::policy::GetPolicy,
@@ -73,12 +70,8 @@ impl Gateway {
7370
R::Resolution: Unpin,
7471
{
7572
let opaq = {
76-
let registry = registry.sub_registry_with_prefix("tcp");
7773
let resolve = resolve.clone();
78-
let opaq = self
79-
.outbound
80-
.to_tcp_connect()
81-
.push_opaq_cached(registry, resolve);
74+
let opaq = self.outbound.to_tcp_connect().push_opaq_cached(resolve);
8275
self.opaq(opaq.into_inner()).into_inner()
8376
};
8477

@@ -88,7 +81,7 @@ impl Gateway {
8881
.to_tcp_connect()
8982
.push_tcp_endpoint()
9083
.push_http_tcp_client();
91-
let http = self.http(registry, http.into_inner(), resolve);
84+
let http = self.http(http.into_inner(), resolve);
9285
self.inbound
9386
.clone()
9487
.with_stack(http.into_inner())

linkerd/app/integration/src/proxy.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,14 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
457457
let bind_adm = listen::BindTcp::default();
458458
let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
459459
let main = config
460-
.build(bind_in, bind_out, bind_adm, shutdown_tx, trace_handle)
460+
.build(
461+
bind_in,
462+
bind_out,
463+
bind_adm,
464+
shutdown_tx,
465+
trace_handle,
466+
Default::default(),
467+
)
461468
.await
462469
.expect("config");
463470

linkerd/app/outbound/src/discover/tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ async fn errors_propagate() {
3737

3838
// Create a profile stack that uses the tracked inner stack.
3939
let (rt, _shutdown) = runtime();
40-
let stack = Outbound::new(default_config(), rt)
40+
let stack = Outbound::new(default_config(), rt, &mut Default::default())
4141
.with_stack(stack)
4242
.push_discover(discover)
4343
.into_inner();
@@ -107,7 +107,7 @@ async fn caches_profiles_until_idle() {
107107
cfg
108108
};
109109
let (rt, _shutdown) = runtime();
110-
let stack = Outbound::new(cfg, rt)
110+
let stack = Outbound::new(cfg, rt, &mut Default::default())
111111
.with_stack(stack)
112112
.push_discover(discover)
113113
.into_inner();

linkerd/app/outbound/src/http.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,7 @@ impl<T> Outbound<svc::ArcNewHttp<concrete::Endpoint<logical::Concrete<Http<T>>>>
8686
/// Builds a stack that routes HTTP requests to endpoint stacks.
8787
///
8888
/// Buffered concrete services are cached in and evicted when idle.
89-
pub fn push_http_cached<R>(
90-
self,
91-
metrics: HttpMetrics,
92-
resolve: R,
93-
) -> Outbound<svc::ArcNewCloneHttp<T>>
89+
pub fn push_http_cached<R>(self, resolve: R) -> Outbound<svc::ArcNewCloneHttp<T>>
9490
where
9591
// Logical HTTP target.
9692
T: svc::Param<http::Version>,
@@ -101,8 +97,8 @@ impl<T> Outbound<svc::ArcNewHttp<concrete::Endpoint<logical::Concrete<Http<T>>>>
10197
R::Resolution: Unpin,
10298
{
10399
self.push_http_endpoint()
104-
.push_http_concrete(metrics.balancer, resolve)
105-
.push_http_logical(metrics.http_route, metrics.grpc_route)
100+
.push_http_concrete(resolve)
101+
.push_http_logical()
106102
.map_stack(move |config, _, stk| {
107103
stk.push_new_idle_cached(config.discovery_idle_timeout)
108104
.push_map_target(Http)

linkerd/app/outbound/src/http/concrete.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,7 @@ impl<N> Outbound<N> {
6060
/// 'failfast'. While in failfast, buffered requests are failed and the
6161
/// service becomes unavailable so callers may choose alternate concrete
6262
/// services.
63-
pub fn push_http_concrete<T, NSvc, R>(
64-
self,
65-
balancer_metrics: balance::BalancerMetrics,
66-
resolve: R,
67-
) -> Outbound<svc::ArcNewCloneHttp<T>>
63+
pub fn push_http_concrete<T, NSvc, R>(self, resolve: R) -> Outbound<svc::ArcNewCloneHttp<T>>
6864
where
6965
// Concrete target type.
7066
T: svc::Param<ParentRef>,
@@ -105,12 +101,7 @@ impl<N> Outbound<N> {
105101
});
106102

107103
inner
108-
.push(balance::Balance::layer(
109-
config,
110-
rt,
111-
balancer_metrics,
112-
resolve,
113-
))
104+
.push(balance::Balance::layer(config, rt, resolve))
114105
.check_new_clone()
115106
.push_switch(Ok::<_, Infallible>, forward.into_inner())
116107
.push_switch(

linkerd/app/outbound/src/http/concrete/balance.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ where
8282
pub(super) fn layer<N, NSvc, R>(
8383
config: &crate::Config,
8484
rt: &crate::Runtime,
85-
metrics: BalancerMetrics,
8685
resolve: R,
8786
) -> impl svc::Layer<N, Service = svc::ArcNewCloneHttp<Self>> + Clone
8887
where
@@ -101,6 +100,7 @@ where
101100
let http_queue = config.http_request_queue;
102101
let inbound_ips = config.inbound_ips.clone();
103102
let stack_metrics = rt.metrics.proxy.stack.clone();
103+
let balance_metrics = rt.metrics.prom.http.balancer.clone();
104104

105105
let resolve = svc::stack(resolve.into_service())
106106
.push_map_target(|t: Self| ConcreteAddr(t.addr))
@@ -148,7 +148,10 @@ where
148148
.push(svc::ArcNewService::layer());
149149

150150
endpoint
151-
.push(http::NewBalance::layer(resolve.clone(), metrics.clone()))
151+
.push(http::NewBalance::layer(
152+
resolve.clone(),
153+
balance_metrics.clone(),
154+
))
152155
.push_on_service(http::BoxResponse::layer())
153156
.push_on_service(stack_metrics.layer(stack_labels("http", "balance")))
154157
.push(svc::NewMapErr::layer_from_target::<BalanceError, _>())

0 commit comments

Comments
 (0)