Skip to content

Commit 6d807b8

Browse files
authored
Event handlers are now idempotent. (#91)
1 parent 894212c commit 6d807b8

File tree

9 files changed

+520
-169
lines changed

9 files changed

+520
-169
lines changed

CHANGELOG.md

+5-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10-
Nothing yet.
10+
### Fixed
11+
12+
- Event handlers are now idempotent.
1113

1214
## [3.0.4] - 2024-12-27
1315

@@ -67,7 +69,8 @@ Nothing yet.
6769
- Relational storage (PostgreSQL and Microsoft SQL Server) for Identity entities.
6870
- Unit and Integration tests.
6971

70-
[unreleased]: https://github.com/Logitar/Identity/compare/v3.0.4...HEAD
72+
[unreleased]: https://github.com/Logitar/Identity/compare/v3.0.5...HEAD
73+
[3.0.5]: https://github.com/Logitar/Identity/compare/v3.0.4...v3.0.5
7174
[3.0.4]: https://github.com/Logitar/Identity/compare/v3.0.3...v3.0.4
7275
[3.0.3]: https://github.com/Logitar/Identity/compare/v3.0.2...v3.0.3
7376
[3.0.2]: https://github.com/Logitar/Identity/compare/v3.0.1...v3.0.2

lib/Logitar.Identity.EntityFrameworkCore.Relational/Entities/UserEntity.cs

+4
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ public void Enable(UserEnabled @event)
198198

199199
public void RemoveCustomIdentifier(UserIdentifierRemoved @event)
200200
{
201+
Update(@event);
202+
201203
UserIdentifierEntity? identifier = Identifiers.SingleOrDefault(x => x.Key == @event.Key.Value);
202204
if (identifier != null)
203205
{
@@ -241,6 +243,8 @@ public void SetAddress(UserAddressChanged @event)
241243

242244
public void SetCustomIdentifier(UserIdentifierChanged @event)
243245
{
246+
Update(@event);
247+
244248
UserIdentifierEntity? identifier = Identifiers.SingleOrDefault(x => x.Key == @event.Key.Value);
245249
if (identifier == null)
246250
{

lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/ApiKeyEvents.cs

+75-27
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,39 @@ public sealed class ApiKeyEvents : INotificationHandler<ApiKeyAuthenticated>,
1414
{
1515
private readonly IdentityContext _context;
1616
private readonly ICustomAttributeService _customAttributes;
17+
private readonly IMediator _mediator;
1718

18-
public ApiKeyEvents(IdentityContext context, ICustomAttributeService customAttributes)
19+
public ApiKeyEvents(IdentityContext context, ICustomAttributeService customAttributes, IMediator mediator)
1920
{
2021
_context = context;
2122
_customAttributes = customAttributes;
23+
_mediator = mediator;
2224
}
2325

2426
public async Task Handle(ApiKeyAuthenticated @event, CancellationToken cancellationToken)
2527
{
26-
ApiKeyEntity apiKey = await _context.ApiKeys
27-
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
28-
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
28+
ApiKeyEntity? apiKey = await _context.ApiKeys
29+
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
2930

30-
apiKey.Authenticate(@event);
31+
if (apiKey == null || apiKey.Version != (@event.Version - 1))
32+
{
33+
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
34+
}
35+
else
36+
{
37+
apiKey.Authenticate(@event);
3138

32-
await _context.SaveChangesAsync(cancellationToken);
39+
await _context.SaveChangesAsync(cancellationToken);
40+
41+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
42+
}
3343
}
3444

3545
public async Task Handle(ApiKeyCreated @event, CancellationToken cancellationToken)
3646
{
3747
ApiKeyEntity? apiKey = await _context.ApiKeys.AsNoTracking()
3848
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
49+
3950
if (apiKey == null)
4051
{
4152
apiKey = new(@event);
@@ -44,61 +55,98 @@ public async Task Handle(ApiKeyCreated @event, CancellationToken cancellationTok
4455

4556
await SaveActorAsync(apiKey, cancellationToken);
4657
await _context.SaveChangesAsync(cancellationToken);
58+
59+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
60+
}
61+
else
62+
{
63+
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
4764
}
4865
}
4966

5067
public async Task Handle(ApiKeyDeleted @event, CancellationToken cancellationToken)
5168
{
5269
ApiKeyEntity? apiKey = await _context.ApiKeys
5370
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
54-
if (apiKey != null)
71+
72+
if (apiKey == null)
73+
{
74+
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
75+
}
76+
else
5577
{
5678
_context.ApiKeys.Remove(apiKey);
5779

5880
await DeleteActorAsync(apiKey, cancellationToken);
5981
await _customAttributes.RemoveAsync(EntityType.ApiKey, apiKey.ApiKeyId, cancellationToken);
6082
await _context.SaveChangesAsync(cancellationToken);
83+
84+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
6185
}
6286
}
6387

6488
public async Task Handle(ApiKeyRoleAdded @event, CancellationToken cancellationToken)
6589
{
66-
ApiKeyEntity apiKey = await _context.ApiKeys
67-
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
68-
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
90+
ApiKeyEntity? apiKey = await _context.ApiKeys
91+
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
92+
93+
if (apiKey == null || apiKey.Version != (@event.Version - 1))
94+
{
95+
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
96+
}
97+
else
98+
{
99+
RoleEntity role = await _context.Roles
100+
.SingleOrDefaultAsync(x => x.StreamId == @event.RoleId.Value, cancellationToken)
101+
?? throw new InvalidOperationException($"The role entity 'StreamId={@event.RoleId}' could not be found.");
69102

70-
RoleEntity role = await _context.Roles
71-
.SingleOrDefaultAsync(x => x.StreamId == @event.RoleId.Value, cancellationToken)
72-
?? throw new InvalidOperationException($"The role entity 'StreamId={@event.RoleId}' could not be found.");
103+
apiKey.AddRole(role, @event);
73104

74-
apiKey.AddRole(role, @event);
105+
await _context.SaveChangesAsync(cancellationToken);
75106

76-
await _context.SaveChangesAsync(cancellationToken);
107+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
108+
}
77109
}
78110

79111
public async Task Handle(ApiKeyRoleRemoved @event, CancellationToken cancellationToken)
80112
{
81-
ApiKeyEntity apiKey = await _context.ApiKeys
113+
ApiKeyEntity? apiKey = await _context.ApiKeys
82114
.Include(x => x.Roles)
83-
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
84-
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
115+
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
85116

86-
apiKey.RemoveRole(@event);
117+
if (apiKey == null || apiKey.Version != (@event.Version - 1))
118+
{
119+
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
120+
}
121+
else
122+
{
123+
apiKey.RemoveRole(@event);
124+
125+
await _context.SaveChangesAsync(cancellationToken);
87126

88-
await _context.SaveChangesAsync(cancellationToken);
127+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
128+
}
89129
}
90130

91131
public async Task Handle(ApiKeyUpdated @event, CancellationToken cancellationToken)
92132
{
93-
ApiKeyEntity apiKey = await _context.ApiKeys
94-
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
95-
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
133+
ApiKeyEntity? apiKey = await _context.ApiKeys
134+
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
135+
136+
if (apiKey == null || apiKey.Version != (@event.Version - 1))
137+
{
138+
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
139+
}
140+
else
141+
{
142+
apiKey.Update(@event);
96143

97-
apiKey.Update(@event);
144+
await SaveActorAsync(apiKey, cancellationToken);
145+
await _customAttributes.UpdateAsync(EntityType.ApiKey, apiKey.ApiKeyId, @event.CustomAttributes, cancellationToken);
146+
await _context.SaveChangesAsync(cancellationToken);
98147

99-
await SaveActorAsync(apiKey, cancellationToken);
100-
await _customAttributes.UpdateAsync(EntityType.ApiKey, apiKey.ApiKeyId, @event.CustomAttributes, cancellationToken);
101-
await _context.SaveChangesAsync(cancellationToken);
148+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
149+
}
102150
}
103151

104152
private async Task DeleteActorAsync(ApiKeyEntity apiKey, CancellationToken cancellationToken)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
using Logitar.EventSourcing;
2+
using MediatR;
3+
4+
namespace Logitar.Identity.EntityFrameworkCore.Relational.Handlers;
5+
6+
public record EventHandled(DomainEvent Event) : INotification;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using Logitar.EventSourcing;
2+
using Logitar.Identity.EntityFrameworkCore.Relational.Entities;
3+
using MediatR;
4+
5+
namespace Logitar.Identity.EntityFrameworkCore.Relational.Handlers;
6+
7+
public record EventNotHandled : INotification
8+
{
9+
public long ExpectedVersion { get; }
10+
public long ActualVersion { get; }
11+
12+
public EventNotHandled(long expectedVersion, long actualVersion)
13+
{
14+
ArgumentOutOfRangeException.ThrowIfNegative(expectedVersion);
15+
ArgumentOutOfRangeException.ThrowIfNegative(actualVersion);
16+
17+
ExpectedVersion = expectedVersion;
18+
ActualVersion = actualVersion;
19+
}
20+
21+
public EventNotHandled(DomainEvent @event, AggregateEntity? aggregate)
22+
{
23+
ExpectedVersion = @event.Version - 1;
24+
ActualVersion = aggregate?.Version ?? 0;
25+
}
26+
}

lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/OneTimePasswordEvents.cs

+58-18
Original file line numberDiff line numberDiff line change
@@ -13,71 +13,111 @@ public sealed class OneTimePasswordEvents : INotificationHandler<OneTimePassword
1313
{
1414
private readonly IdentityContext _context;
1515
private readonly ICustomAttributeService _customAttributes;
16+
private readonly IMediator _mediator;
1617

17-
public OneTimePasswordEvents(IdentityContext context, ICustomAttributeService customAttributes)
18+
public OneTimePasswordEvents(IdentityContext context, ICustomAttributeService customAttributes, IMediator mediator)
1819
{
1920
_context = context;
2021
_customAttributes = customAttributes;
22+
_mediator = mediator;
2123
}
2224

2325
public async Task Handle(OneTimePasswordCreated @event, CancellationToken cancellationToken)
2426
{
2527
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords.AsNoTracking()
2628
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
29+
2730
if (oneTimePassword == null)
2831
{
2932
oneTimePassword = new(@event);
3033

3134
_context.OneTimePasswords.Add(oneTimePassword);
3235

3336
await _context.SaveChangesAsync(cancellationToken);
37+
38+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
39+
}
40+
else
41+
{
42+
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
3443
}
3544
}
3645

3746
public async Task Handle(OneTimePasswordDeleted @event, CancellationToken cancellationToken)
3847
{
3948
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
4049
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
41-
if (oneTimePassword != null)
50+
51+
if (oneTimePassword == null)
52+
{
53+
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
54+
}
55+
else
4256
{
4357
_context.OneTimePasswords.Remove(oneTimePassword);
4458

4559
await _customAttributes.RemoveAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, cancellationToken);
4660
await _context.SaveChangesAsync(cancellationToken);
61+
62+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
4763
}
4864
}
4965

5066
public async Task Handle(OneTimePasswordUpdated @event, CancellationToken cancellationToken)
5167
{
52-
OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords
53-
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
54-
?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found.");
68+
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
69+
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
70+
71+
if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1))
72+
{
73+
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
74+
}
75+
else
76+
{
77+
oneTimePassword.Update(@event);
5578

56-
oneTimePassword.Update(@event);
79+
await _customAttributes.UpdateAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, @event.CustomAttributes, cancellationToken);
80+
await _context.SaveChangesAsync(cancellationToken);
5781

58-
await _customAttributes.UpdateAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, @event.CustomAttributes, cancellationToken);
59-
await _context.SaveChangesAsync(cancellationToken);
82+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
83+
}
6084
}
6185

6286
public async Task Handle(OneTimePasswordValidationFailed @event, CancellationToken cancellationToken)
6387
{
64-
OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords
65-
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
66-
?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found.");
88+
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
89+
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
90+
91+
if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1))
92+
{
93+
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
94+
}
95+
else
96+
{
97+
oneTimePassword.Fail(@event);
6798

68-
oneTimePassword.Fail(@event);
99+
await _context.SaveChangesAsync(cancellationToken);
69100

70-
await _context.SaveChangesAsync(cancellationToken);
101+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
102+
}
71103
}
72104

73105
public async Task Handle(OneTimePasswordValidationSucceeded @event, CancellationToken cancellationToken)
74106
{
75-
OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords
76-
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
77-
?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found.");
107+
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
108+
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
109+
110+
if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1))
111+
{
112+
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
113+
}
114+
else
115+
{
116+
oneTimePassword.Succeed(@event);
78117

79-
oneTimePassword.Succeed(@event);
118+
await _context.SaveChangesAsync(cancellationToken);
80119

81-
await _context.SaveChangesAsync(cancellationToken);
120+
await _mediator.Publish(new EventHandled(@event), cancellationToken);
121+
}
82122
}
83123
}

0 commit comments

Comments
 (0)