Skip to content

Commit 251dda7

Browse files
committed
Fast slow store directions
There are multiple use cases where we don't want a fast-slow store to persist to one of the stores in some direction. For example, worker nodes do not want to store build results on the local filesystem, just with the upstream CAS. Another case would be the re-use of prod action cache in a dev environment, but not vice-versa. This PR introduces options to the fast-slow store which default to the existing behaviour, but allows customisation of each side of the fast slow store to either persist in the case or get operations, put operations or to make them read only. Fixes #1577
1 parent 7afe286 commit 251dda7

File tree

8 files changed

+221
-13
lines changed

8 files changed

+221
-13
lines changed

deployment-examples/docker-compose/worker.json5

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
}
3434
}
3535
},
36+
"fast_direction": "get",
3637
"slow": {
3738
"ref_store": {
3839
"name": "GRPC_LOCAL_STORE"

kubernetes/components/worker/worker.json5

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
}
3333
}
3434
},
35+
"fast_direction": "get",
3536
"slow": {
3637
"ref_store": {
3738
"name": "GRPC_LOCAL_STORE"

nativelink-config/src/stores.rs

+30
Original file line numberDiff line numberDiff line change
@@ -514,16 +514,46 @@ pub struct FilesystemSpec {
514514
pub block_size: u64,
515515
}
516516

517+
#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
518+
#[serde(rename_all = "snake_case")]
519+
pub enum StoreDirection {
520+
/// The store operates normally and all get and put operations are
521+
/// handled by it.
522+
#[default]
523+
Both,
524+
/// Update operations will cause persistence to this store, but Get
525+
/// operations will be ignored.
526+
/// This only makes sense on the fast store as the slow store will
527+
/// never get written to on Get anyway.
528+
Update,
529+
/// Get operations will cause persistence to this store, but Update
530+
/// operations will be ignored.
531+
Get,
532+
/// Operate as a read only store, only really makes sense if there's
533+
/// another way to write to it.
534+
ReadOnly,
535+
}
536+
517537
#[derive(Serialize, Deserialize, Debug, Clone)]
518538
#[serde(deny_unknown_fields)]
519539
pub struct FastSlowSpec {
520540
/// Fast store that will be attempted to be contacted before reaching
521541
/// out to the `slow` store.
522542
pub fast: StoreSpec,
523543

544+
/// How to handle the fast store. This can be useful to set to Get for
545+
/// worker nodes such that results are persisted to the slow store only.
546+
#[serde(default)]
547+
pub fast_direction: StoreDirection,
548+
524549
/// If the object does not exist in the `fast` store it will try to
525550
/// get it from this store.
526551
pub slow: StoreSpec,
552+
553+
/// How to handle the slow store. This can be useful if creating a diode
554+
/// and you wish to have an upstream read only store.
555+
#[serde(default)]
556+
pub slow_direction: StoreDirection,
527557
}
528558

529559
#[derive(Serialize, Deserialize, Debug, Default, Clone)]

nativelink-store/src/fast_slow_store.rs

+63-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::{Arc, Weak};
2121

2222
use async_trait::async_trait;
2323
use futures::{join, FutureExt};
24-
use nativelink_config::stores::FastSlowSpec;
24+
use nativelink_config::stores::{FastSlowSpec, StoreDirection};
2525
use nativelink_error::{make_err, Code, Error, ResultExt};
2626
use nativelink_metric::MetricsComponent;
2727
use nativelink_util::buf_channel::{
@@ -45,18 +45,22 @@ use nativelink_util::store_trait::{
4545
pub struct FastSlowStore {
4646
#[metric(group = "fast_store")]
4747
fast_store: Store,
48+
fast_direction: StoreDirection,
4849
#[metric(group = "slow_store")]
4950
slow_store: Store,
51+
slow_direction: StoreDirection,
5052
weak_self: Weak<Self>,
5153
#[metric]
5254
metrics: FastSlowStoreMetrics,
5355
}
5456

5557
impl FastSlowStore {
56-
pub fn new(_spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
58+
pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
5759
Arc::new_cyclic(|weak_self| Self {
5860
fast_store,
61+
fast_direction: spec.fast_direction.clone(),
5962
slow_store,
63+
slow_direction: spec.slow_direction.clone(),
6064
weak_self: weak_self.clone(),
6165
metrics: FastSlowStoreMetrics::default(),
6266
})
@@ -155,12 +159,31 @@ impl StoreDriver for FastSlowStore {
155159
) -> Result<(), Error> {
156160
// If either one of our stores is a noop store, bypass the multiplexing
157161
// and just use the store that is not a noop store.
158-
let slow_store = self.slow_store.inner_store(Some(key.borrow()));
159-
if slow_store.optimized_for(StoreOptimizations::NoopUpdates) {
162+
let ignore_slow = self
163+
.slow_store
164+
.inner_store(Some(key.borrow()))
165+
.optimized_for(StoreOptimizations::NoopUpdates)
166+
|| self.slow_direction == StoreDirection::ReadOnly
167+
|| self.slow_direction == StoreDirection::Get;
168+
let ignore_fast = self
169+
.fast_store
170+
.inner_store(Some(key.borrow()))
171+
.optimized_for(StoreOptimizations::NoopUpdates)
172+
|| self.fast_direction == StoreDirection::ReadOnly
173+
|| self.fast_direction == StoreDirection::Get;
174+
if ignore_slow && ignore_fast {
175+
// We need to drain the reader to avoid the writer complaining that we dropped
176+
// the connection prematurely.
177+
reader
178+
.drain()
179+
.await
180+
.err_tip(|| "In FastFlowStore::update")?;
181+
return Ok(());
182+
}
183+
if ignore_slow {
160184
return self.fast_store.update(key, reader, size_info).await;
161185
}
162-
let fast_store = self.fast_store.inner_store(Some(key.borrow()));
163-
if fast_store.optimized_for(StoreOptimizations::NoopUpdates) {
186+
if ignore_fast {
164187
return self.slow_store.update(key, reader, size_info).await;
165188
}
166189

@@ -233,7 +256,10 @@ impl StoreDriver for FastSlowStore {
233256
{
234257
if !self
235258
.slow_store
259+
.inner_store(Some(key.borrow()))
236260
.optimized_for(StoreOptimizations::NoopUpdates)
261+
&& self.slow_direction != StoreDirection::ReadOnly
262+
&& self.slow_direction != StoreDirection::Get
237263
{
238264
slow_update_store_with_file(
239265
self.slow_store.as_store_driver_pin(),
@@ -244,6 +270,11 @@ impl StoreDriver for FastSlowStore {
244270
.await
245271
.err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?;
246272
}
273+
if self.fast_direction == StoreDirection::ReadOnly
274+
|| self.fast_direction == StoreDirection::Get
275+
{
276+
return Ok(Some(file));
277+
}
247278
return self
248279
.fast_store
249280
.update_with_whole_file(key, file, upload_size)
@@ -254,10 +285,13 @@ impl StoreDriver for FastSlowStore {
254285
.slow_store
255286
.optimized_for(StoreOptimizations::FileUpdates)
256287
{
257-
if !self
288+
let ignore_fast = self
258289
.fast_store
290+
.inner_store(Some(key.borrow()))
259291
.optimized_for(StoreOptimizations::NoopUpdates)
260-
{
292+
|| self.fast_direction == StoreDirection::ReadOnly
293+
|| self.fast_direction == StoreDirection::Get;
294+
if !ignore_fast {
261295
slow_update_store_with_file(
262296
self.fast_store.as_store_driver_pin(),
263297
key.borrow(),
@@ -267,6 +301,11 @@ impl StoreDriver for FastSlowStore {
267301
.await
268302
.err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
269303
}
304+
let ignore_slow = self.slow_direction == StoreDirection::ReadOnly
305+
|| self.slow_direction == StoreDirection::Get;
306+
if ignore_slow {
307+
return Ok(Some(file));
308+
}
270309
return self
271310
.slow_store
272311
.update_with_whole_file(key, file, upload_size)
@@ -317,6 +356,22 @@ impl StoreDriver for FastSlowStore {
317356
.slow_store_hit_count
318357
.fetch_add(1, Ordering::Acquire);
319358

359+
if self
360+
.fast_store
361+
.inner_store(Some(key.borrow()))
362+
.optimized_for(StoreOptimizations::NoopUpdates)
363+
|| self.fast_direction == StoreDirection::ReadOnly
364+
|| self.fast_direction == StoreDirection::Update
365+
{
366+
self.slow_store
367+
.get_part(key, writer.borrow_mut(), offset, length)
368+
.await?;
369+
self.metrics
370+
.slow_store_downloaded_bytes
371+
.fetch_add(writer.get_bytes_written(), Ordering::Acquire);
372+
return Ok(());
373+
}
374+
320375
let send_range = offset..length.map_or(u64::MAX, |length| length + offset);
321376
let mut bytes_received: u64 = 0;
322377

nativelink-store/tests/fast_slow_store_test.rs

+109-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex};
1818

1919
use async_trait::async_trait;
2020
use bytes::Bytes;
21-
use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreSpec};
21+
use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreDirection, StoreSpec};
2222
use nativelink_error::{make_err, Code, Error, ResultExt};
2323
use nativelink_macro::nativelink_test;
2424
use nativelink_metric::MetricsComponent;
@@ -35,20 +35,29 @@ use rand::{Rng, SeedableRng};
3535

3636
const MEGABYTE_SZ: usize = 1024 * 1024;
3737

38-
fn make_stores() -> (Store, Store, Store) {
38+
fn make_stores_direction(
39+
fast_direction: StoreDirection,
40+
slow_direction: StoreDirection,
41+
) -> (Store, Store, Store) {
3942
let fast_store = Store::new(MemoryStore::new(&MemorySpec::default()));
4043
let slow_store = Store::new(MemoryStore::new(&MemorySpec::default()));
4144
let fast_slow_store = Store::new(FastSlowStore::new(
4245
&FastSlowSpec {
4346
fast: StoreSpec::memory(MemorySpec::default()),
47+
fast_direction,
4448
slow: StoreSpec::memory(MemorySpec::default()),
49+
slow_direction,
4550
},
4651
fast_store.clone(),
4752
slow_store.clone(),
4853
));
4954
(fast_slow_store, fast_store, slow_store)
5055
}
5156

57+
fn make_stores() -> (Store, Store, Store) {
58+
make_stores_direction(StoreDirection::default(), StoreDirection::default())
59+
}
60+
5261
fn make_random_data(sz: usize) -> Vec<u8> {
5362
let mut value = vec![0u8; sz];
5463
let mut rng = SmallRng::seed_from_u64(1);
@@ -331,7 +340,9 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> {
331340
let fast_slow_store = FastSlowStore::new(
332341
&FastSlowSpec {
333342
fast: StoreSpec::memory(MemorySpec::default()),
343+
fast_direction: StoreDirection::default(),
334344
slow: StoreSpec::memory(MemorySpec::default()),
345+
slow_direction: StoreDirection::default(),
335346
},
336347
fast_store,
337348
slow_store,
@@ -372,7 +383,9 @@ async fn ignore_value_in_fast_store() -> Result<(), Error> {
372383
let fast_slow_store = Arc::new(FastSlowStore::new(
373384
&FastSlowSpec {
374385
fast: StoreSpec::memory(MemorySpec::default()),
386+
fast_direction: StoreDirection::default(),
375387
slow: StoreSpec::memory(MemorySpec::default()),
388+
slow_direction: StoreDirection::default(),
376389
},
377390
fast_store.clone(),
378391
slow_store,
@@ -395,7 +408,9 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> {
395408
let slow_store = Store::new(NoopStore::new());
396409
let fast_slow_store_config = FastSlowSpec {
397410
fast: StoreSpec::memory(MemorySpec::default()),
411+
fast_direction: StoreDirection::default(),
398412
slow: StoreSpec::noop(NoopSpec::default()),
413+
slow_direction: StoreDirection::default(),
399414
};
400415
let fast_slow_store = Arc::new(FastSlowStore::new(
401416
&fast_slow_store_config,
@@ -430,3 +445,95 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> {
430445
);
431446
Ok(())
432447
}
448+
449+
#[nativelink_test]
450+
async fn fast_get_only_not_updated() -> Result<(), Error> {
451+
let (fast_slow_store, fast_store, slow_store) =
452+
make_stores_direction(StoreDirection::Get, StoreDirection::Both);
453+
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
454+
fast_slow_store
455+
.update_oneshot(digest, make_random_data(100).into())
456+
.await?;
457+
assert!(
458+
fast_store.has(digest).await?.is_none(),
459+
"Expected data to not be in the fast store"
460+
);
461+
assert!(
462+
slow_store.has(digest).await?.is_some(),
463+
"Expected data in the slow store"
464+
);
465+
Ok(())
466+
}
467+
468+
#[nativelink_test]
469+
async fn fast_readonly_only_not_updated() -> Result<(), Error> {
470+
let (fast_slow_store, fast_store, slow_store) =
471+
make_stores_direction(StoreDirection::ReadOnly, StoreDirection::Both);
472+
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
473+
fast_slow_store
474+
.update_oneshot(digest, make_random_data(100).into())
475+
.await?;
476+
assert!(
477+
fast_store.has(digest).await?.is_none(),
478+
"Expected data to not be in the fast store"
479+
);
480+
assert!(
481+
slow_store.has(digest).await?.is_some(),
482+
"Expected data in the slow store"
483+
);
484+
Ok(())
485+
}
486+
487+
#[nativelink_test]
488+
async fn slow_readonly_only_not_updated() -> Result<(), Error> {
489+
let (fast_slow_store, fast_store, slow_store) =
490+
make_stores_direction(StoreDirection::Both, StoreDirection::ReadOnly);
491+
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
492+
fast_slow_store
493+
.update_oneshot(digest, make_random_data(100).into())
494+
.await?;
495+
assert!(
496+
fast_store.has(digest).await?.is_some(),
497+
"Expected data to be in the fast store"
498+
);
499+
assert!(
500+
slow_store.has(digest).await?.is_none(),
501+
"Expected data to not be in the slow store"
502+
);
503+
Ok(())
504+
}
505+
506+
#[nativelink_test]
507+
async fn slow_get_only_not_updated() -> Result<(), Error> {
508+
let (fast_slow_store, fast_store, slow_store) =
509+
make_stores_direction(StoreDirection::Both, StoreDirection::Get);
510+
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
511+
fast_slow_store
512+
.update_oneshot(digest, make_random_data(100).into())
513+
.await?;
514+
assert!(
515+
fast_store.has(digest).await?.is_some(),
516+
"Expected data to be in the fast store"
517+
);
518+
assert!(
519+
slow_store.has(digest).await?.is_none(),
520+
"Expected data to not be in the slow store"
521+
);
522+
Ok(())
523+
}
524+
525+
#[nativelink_test]
526+
async fn fast_put_only_not_updated() -> Result<(), Error> {
527+
let (fast_slow_store, fast_store, slow_store) =
528+
make_stores_direction(StoreDirection::Update, StoreDirection::Both);
529+
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
530+
slow_store
531+
.update_oneshot(digest, make_random_data(100).into())
532+
.await?;
533+
let _ = fast_slow_store.get_part_unchunked(digest, 0, None).await;
534+
assert!(
535+
fast_store.has(digest).await?.is_none(),
536+
"Expected data to not be in the fast store"
537+
);
538+
Ok(())
539+
}

nativelink-store/tests/filesystem_store_test.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use filetime::{set_file_atime, FileTime};
2727
use futures::executor::block_on;
2828
use futures::task::Poll;
2929
use futures::{poll, Future, FutureExt};
30-
use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec};
30+
use nativelink_config::stores::{
31+
FastSlowSpec, FilesystemSpec, MemorySpec, StoreDirection, StoreSpec,
32+
};
3133
use nativelink_error::{make_err, Code, Error, ResultExt};
3234
use nativelink_macro::nativelink_test;
3335
use nativelink_store::fast_slow_store::FastSlowStore;
@@ -1333,7 +1335,9 @@ async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result<
13331335
// Note: The config is not needed for this test, so use dummy data.
13341336
&FastSlowSpec {
13351337
fast: StoreSpec::memory(MemorySpec::default()),
1338+
fast_direction: StoreDirection::default(),
13361339
slow: StoreSpec::memory(MemorySpec::default()),
1340+
slow_direction: StoreDirection::default(),
13371341
},
13381342
Store::new(
13391343
FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {

0 commit comments

Comments
 (0)