Skip to content

Forward propagation control of cache entries #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/CacheTower.Extensions.Redis/RedisLockExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void Register(ICacheStack cacheStack)
RegisteredStack = cacheStack;
}

public async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func<ValueTask<CacheEntry<T>>> valueProvider, CacheSettings settings)
public async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func<ValueTask<CacheEntry<T>>> valueProvider, CacheEntryLifetime settings)
{
var hasLock = await Database.StringSetAsync(cacheKey, RedisValue.EmptyString, expiry: LockTimeout, when: When.NotExists);

Expand All @@ -82,7 +82,7 @@ public async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func
}
}

private async Task<CacheEntry<T>> WaitForResult<T>(string cacheKey, CacheSettings settings)
private async Task<CacheEntry<T>> WaitForResult<T>(string cacheKey, CacheEntryLifetime settings)
{
var delayedResultSource = new TaskCompletionSource<bool>();
var waitList = new[] { delayedResultSource };
Expand Down
14 changes: 12 additions & 2 deletions src/CacheTower/CacheEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,17 @@ namespace CacheTower
{
public abstract class CacheEntry
{
/// <summary>
/// The absolute expiry date of the <see cref="CacheEntry"/>.
/// </summary>
public DateTime Expiry { get; }
/// <summary>
/// The number of in-memory cache hits the <see cref="CacheEntry"/> has had.
/// </summary>
public int CacheHitCount => _CacheHitCount;

internal int _CacheHitCount;
internal bool _HasBeenForwardPropagated;

protected CacheEntry(DateTime expiry)
{
Expand All @@ -18,9 +28,9 @@ protected CacheEntry(DateTime expiry)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public DateTime GetStaleDate(CacheSettings cacheSettings)
public DateTime GetStaleDate(CacheEntryLifetime entryLifetime)
{
return Expiry - cacheSettings.TimeToLive + cacheSettings.StaleAfter;
return Expiry - entryLifetime.TimeToLive + entryLifetime.StaleAfter;
}
}

Expand Down
24 changes: 24 additions & 0 deletions src/CacheTower/CacheEntryLifetime.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace CacheTower
{
public struct CacheEntryLifetime
{
public TimeSpan TimeToLive { get; }
public TimeSpan StaleAfter { get; }

public CacheEntryLifetime(TimeSpan timeToLive)
{
TimeToLive = timeToLive;
StaleAfter = TimeSpan.Zero;
}

public CacheEntryLifetime(TimeSpan timeToLive, TimeSpan staleAfter)
{
TimeToLive = timeToLive;
StaleAfter = staleAfter;
}
}
}
19 changes: 5 additions & 14 deletions src/CacheTower/CacheSettings.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
using CacheTower.Providers.Memory;

namespace CacheTower
{
public struct CacheSettings
{
public TimeSpan TimeToLive { get; }
public TimeSpan StaleAfter { get; }

public CacheSettings(TimeSpan timeToLive)
{
TimeToLive = timeToLive;
StaleAfter = TimeSpan.Zero;
}

public CacheSettings(TimeSpan timeToLive, TimeSpan staleAfter)
{
TimeToLive = timeToLive;
StaleAfter = staleAfter;
}
/// <summary>
/// The number of cache hits before forward propagating from a <see cref="MemoryCacheLayer" /> to higher level caches.
/// </summary>
public uint ForwardPropagateAfterXCacheHits { get; set; }
}
}
159 changes: 102 additions & 57 deletions src/CacheTower/CacheStack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using CacheTower.Extensions;
using CacheTower.Providers.Memory;
using Microsoft.Extensions.DependencyInjection;

namespace CacheTower
Expand Down Expand Up @@ -89,17 +90,17 @@ public async ValueTask EvictAsync(string cacheKey)
}
}

public async ValueTask<CacheEntry<T>> SetAsync<T>(string cacheKey, T value, TimeSpan timeToLive)
public async ValueTask<CacheEntry<T>> SetAsync<T>(string cacheKey, T value, TimeSpan timeToLive, CacheSettings settings = default)
{
ThrowIfDisposed();

var expiry = DateTime.UtcNow + timeToLive;
var cacheEntry = new CacheEntry<T>(value, expiry);
await SetAsync(cacheKey, cacheEntry);
await SetAsync(cacheKey, cacheEntry, settings);
return cacheEntry;
}

public async ValueTask SetAsync<T>(string cacheKey, CacheEntry<T> cacheEntry)
public async ValueTask SetAsync<T>(string cacheKey, CacheEntry<T> cacheEntry, CacheSettings settings = default)
{
ThrowIfDisposed();

Expand All @@ -113,16 +114,23 @@ public async ValueTask SetAsync<T>(string cacheKey, CacheEntry<T> cacheEntry)
throw new ArgumentNullException(nameof(cacheEntry));
}

for (int i = 0, l = CacheLayers.Length; i < l; i++)
if (settings.ForwardPropagateAfterXCacheHits > 0 && CacheLayers[0] is MemoryCacheLayer memoryCacheLayer)
{
var layer = CacheLayers[i];
if (layer is ISyncCacheLayer syncLayerOne)
{
syncLayerOne.Set(cacheKey, cacheEntry);
}
else
memoryCacheLayer.Set(cacheKey, cacheEntry);
}
else
{
for (int i = 0, l = CacheLayers.Length; i < l; i++)
{
await (layer as IAsyncCacheLayer).SetAsync(cacheKey, cacheEntry);
var layer = CacheLayers[i];
if (layer is ISyncCacheLayer syncLayerOne)
{
syncLayerOne.Set(cacheKey, cacheEntry);
}
else
{
await (layer as IAsyncCacheLayer).SetAsync(cacheKey, cacheEntry);
}
}
}
}
Expand Down Expand Up @@ -201,7 +209,7 @@ public async ValueTask<CacheEntry<T>> GetAsync<T>(string cacheKey)
return default;
}

public async ValueTask<T> GetOrSetAsync<T>(string cacheKey, Func<T, Task<T>> getter, CacheSettings settings)
public async ValueTask<T> GetOrSetAsync<T>(string cacheKey, Func<T, Task<T>> getter, CacheEntryLifetime entryLifetime, CacheSettings settings = default)
{
ThrowIfDisposed();

Expand All @@ -220,13 +228,14 @@ public async ValueTask<T> GetOrSetAsync<T>(string cacheKey, Func<T, Task<T>> get
{
var cacheEntry = cacheEntryPoint.CacheEntry;
var currentTime = DateTime.UtcNow;
if (cacheEntry.GetStaleDate(settings) < currentTime)
var isStale = cacheEntry.GetStaleDate(entryLifetime) < currentTime;
if (isStale)
{
if (cacheEntry.Expiry < currentTime)
{
//Refresh the value in the current thread though short circuit if we're unable to establish a lock
//If the lock isn't established, it will instead use the stale cache entry (even if past the allowed stale period)
var refreshedCacheEntry = await RefreshValueAsync(cacheKey, getter, settings, waitForRefresh: false);
var refreshedCacheEntry = await RefreshValueAsync(cacheKey, getter, entryLifetime, settings, waitForRefresh: false);
if (refreshedCacheEntry != default)
{
cacheEntry = refreshedCacheEntry;
Expand All @@ -235,62 +244,56 @@ public async ValueTask<T> GetOrSetAsync<T>(string cacheKey, Func<T, Task<T>> get
else
{
//Refresh the value in the background
_ = RefreshValueAsync(cacheKey, getter, settings, waitForRefresh: false);
_ = RefreshValueAsync(cacheKey, getter, entryLifetime, settings, waitForRefresh: false);
}
}
else if (cacheEntryPoint.LayerIndex > 0)
else
{
//If a lower-level cache is missing the latest data, attempt to set it in the background
_ = BackPopulateCacheAsync(cacheEntryPoint.LayerIndex, cacheKey, cacheEntry);
if (cacheEntryPoint.LayerIndex > 0)
{
//If a lower-level cache (eg. a memory cache) is missing the latest data, attempt to set it in the background
_ = BackPropagateCacheEntryAsync(cacheEntryPoint.LayerIndex, cacheKey, cacheEntry);
}
else if (!cacheEntry._HasBeenForwardPropagated && settings.ForwardPropagateAfterXCacheHits > 0 && cacheEntry.CacheHitCount >= settings.ForwardPropagateAfterXCacheHits)
{
//If enabled, we push the local cache entry to higher-level caches, doing so in the background
_ = ForwardPropagateCacheEntryAsync(cacheEntryPoint.LayerIndex + 1, cacheKey, cacheEntry);
}
}

return cacheEntry.Value;
}
else
{
//Refresh the value in the current thread though because we have no old cache value, we have to lock and wait
return (await RefreshValueAsync(cacheKey, getter, settings, waitForRefresh: true)).Value;
return (await RefreshValueAsync(cacheKey, getter, entryLifetime, settings, waitForRefresh: true)).Value;
}
}

private async ValueTask BackPopulateCacheAsync<T>(int fromIndexExclusive, string cacheKey, CacheEntry<T> cacheEntry)
private async ValueTask BackPropagateCacheEntryAsync<T>(int fromIndexExclusive, string cacheKey, CacheEntry<T> cacheEntry)
{
ThrowIfDisposed();

var hasLock = false;
lock (WaitingKeyRefresh)
{
#if NETSTANDARD2_0
hasLock = !WaitingKeyRefresh.ContainsKey(cacheKey);
if (hasLock)
{
WaitingKeyRefresh[cacheKey] = Array.Empty<TaskCompletionSource<object>>();
}
#elif NETSTANDARD2_1
hasLock = WaitingKeyRefresh.TryAdd(cacheKey, Array.Empty<TaskCompletionSource<object>>());
#endif
}

if (hasLock)
if (TryGetKeyRefreshLock(cacheKey))
{
try
{
for (; --fromIndexExclusive >= 0;)
{
var previousLayer = CacheLayers[fromIndexExclusive];
if (previousLayer is ISyncCacheLayer prevSyncLayer)
var cacheLayer = CacheLayers[fromIndexExclusive];
if (cacheLayer is ISyncCacheLayer syncLayer)
{
if (prevSyncLayer.IsAvailable(cacheKey))
if (syncLayer.IsAvailable(cacheKey))
{
prevSyncLayer.Set(cacheKey, cacheEntry);
syncLayer.Set(cacheKey, cacheEntry);
}
}
else
{
var prevAsyncLayer = previousLayer as IAsyncCacheLayer;
if (await prevAsyncLayer.IsAvailableAsync(cacheKey))
var asyncCacheLayer = cacheLayer as IAsyncCacheLayer;
if (await asyncCacheLayer.IsAvailableAsync(cacheKey))
{
await prevAsyncLayer.SetAsync(cacheKey, cacheEntry);
await asyncCacheLayer.SetAsync(cacheKey, cacheEntry);
}
}
}
Expand All @@ -302,25 +305,48 @@ private async ValueTask BackPopulateCacheAsync<T>(int fromIndexExclusive, string
}
}

private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func<T, Task<T>> getter, CacheSettings settings, bool waitForRefresh)
private async ValueTask ForwardPropagateCacheEntryAsync<T>(int fromIndexExclusive, string cacheKey, CacheEntry<T> cacheEntry)
{
ThrowIfDisposed();

var hasLock = false;
lock (WaitingKeyRefresh)
if (TryGetKeyRefreshLock(cacheKey) && cacheEntry._HasBeenForwardPropagated)
{
#if NETSTANDARD2_0
hasLock = !WaitingKeyRefresh.ContainsKey(cacheKey);
if (hasLock)
try
{
WaitingKeyRefresh[cacheKey] = Array.Empty<TaskCompletionSource<object>>();
for (; ++fromIndexExclusive < CacheLayers.Length;)
{
var cacheLayer = CacheLayers[fromIndexExclusive];
if (cacheLayer is ISyncCacheLayer syncLayer)
{
if (syncLayer.IsAvailable(cacheKey))
{
syncLayer.Set(cacheKey, cacheEntry);
}
}
else
{
var asyncCacheLayer = cacheLayer as IAsyncCacheLayer;
if (await asyncCacheLayer.IsAvailableAsync(cacheKey))
{
await asyncCacheLayer.SetAsync(cacheKey, cacheEntry);
}
}
}

cacheEntry._HasBeenForwardPropagated = true;
}
finally
{
UnlockWaitingTasks(cacheKey, cacheEntry);
}
#elif NETSTANDARD2_1
hasLock = WaitingKeyRefresh.TryAdd(cacheKey, Array.Empty<TaskCompletionSource<object>>());
#endif
}
}

if (hasLock)
private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Func<T, Task<T>> getter, CacheEntryLifetime entryLifetime, CacheSettings settings, bool waitForRefresh)
{
ThrowIfDisposed();

if (TryGetKeyRefreshLock(cacheKey))
{
try
{
Expand All @@ -335,14 +361,14 @@ private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Fun
}

var value = await getter(oldValue);
var refreshedEntry = await SetAsync(cacheKey, value, settings.TimeToLive);
var refreshedEntry = await SetAsync(cacheKey, value, entryLifetime.TimeToLive, settings);

_ = Extensions.OnValueRefreshAsync(cacheKey, settings.TimeToLive);
_ = Extensions.OnValueRefreshAsync(cacheKey, entryLifetime.TimeToLive);

UnlockWaitingTasks(cacheKey, refreshedEntry);

return refreshedEntry;
}, settings);
}, entryLifetime);
}
catch
{
Expand All @@ -369,7 +395,7 @@ private async ValueTask<CacheEntry<T>> RefreshValueAsync<T>(string cacheKey, Fun

//Last minute check to confirm whether waiting is required
var currentEntry = await GetAsync<T>(cacheKey);
if (currentEntry != null && currentEntry.GetStaleDate(settings) > DateTime.UtcNow)
if (currentEntry != null && currentEntry.GetStaleDate(entryLifetime) > DateTime.UtcNow)
{
UnlockWaitingTasks(cacheKey, currentEntry);
return currentEntry;
Expand Down Expand Up @@ -400,6 +426,25 @@ private void UnlockWaitingTasks(string cacheKey, CacheEntry cacheEntry)
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool TryGetKeyRefreshLock(string cacheKey)
{
var hasLock = false;
lock (WaitingKeyRefresh)
{
#if NETSTANDARD2_0
hasLock = !WaitingKeyRefresh.ContainsKey(cacheKey);
if (hasLock)
{
WaitingKeyRefresh[cacheKey] = Array.Empty<TaskCompletionSource<object>>();
}
#elif NETSTANDARD2_1
hasLock = WaitingKeyRefresh.TryAdd(cacheKey, Array.Empty<TaskCompletionSource<object>>());
#endif
}
return hasLock;
}

#if NETSTANDARD2_0
public void Dispose()
{
Expand Down
Loading