Skip to content

Commit 8a14c67

Browse files
wcrooyWouter CrooyKralizek
authored
Add support for parallel execution (#16)
* Make sure that that SNS & SQS Handler are using scoped. So after usage scoped objects are disposed. * Async handling & Concurrent handling in for each loop * Added unit tests and full support to concurrently handle multiple records at the same time. Co-authored-by: Wouter Crooy <[email protected]> Co-authored-by: Renato Golia <[email protected]>
1 parent 662bb12 commit 8a14c67

16 files changed

+988
-18
lines changed

AWSLambdaSharpTemplate.sln

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,29 @@
1-
Microsoft Visual Studio Solution File, Format Version 12.00
2-
# Visual Studio 15
3-
VisualStudioVersion = 15.0.26403.3
1+
Microsoft Visual Studio Solution File, Format Version 12.00
2+
# Visual Studio Version 16
3+
VisualStudioVersion = 16.0.29926.136
44
MinimumVisualStudioVersion = 15.0.26124.0
55
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{42BE6308-5AC6-40B3-96DA-1FF015AF533E}"
66
EndProject
77
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kralizek.Lambda.Template", "src\Kralizek.Lambda.Template\Kralizek.Lambda.Template.csproj", "{9DDBE0B9-6C3C-4A87-89B2-D4BA074BE90E}"
88
EndProject
99
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{A973E126-E744-44E4-A27C-6F63993AC642}"
1010
EndProject
11-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tests.Lambda.Template", "tests\Tests.Lambda.Template\Tests.Lambda.Template.csproj", "{D0263D83-E1A0-46EB-9B82-9793A4D89590}"
11+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests.Lambda.Template", "tests\Tests.Lambda.Template\Tests.Lambda.Template.csproj", "{D0263D83-E1A0-46EB-9B82-9793A4D89590}"
1212
EndProject
1313
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{B9F8C8E0-047D-4F38-8F7D-E64F39E51988}"
1414
EndProject
1515
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventFunction", "samples\EventFunction\EventFunction.csproj", "{3783641F-A1E9-42A7-A7EE-99B8E8F4DB08}"
1616
EndProject
1717
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RequestResponseFunction", "samples\RequestResponseFunction\RequestResponseFunction.csproj", "{9FAE7942-AAB6-4A33-8751-F7973F928858}"
1818
EndProject
19-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kralizek.Lambda.Template.Sns", "src\Kralizek.Lambda.Template.Sns\Kralizek.Lambda.Template.Sns.csproj", "{8DFE9840-FEF4-4E39-89FD-80E968E3C057}"
19+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kralizek.Lambda.Template.Sns", "src\Kralizek.Lambda.Template.Sns\Kralizek.Lambda.Template.Sns.csproj", "{8DFE9840-FEF4-4E39-89FD-80E968E3C057}"
2020
EndProject
2121
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SnsEventFunction", "samples\SnsEventFunction\SnsEventFunction.csproj", "{08E76E38-D49D-4C15-8349-FC23EB37F0DF}"
2222
EndProject
2323
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kralizek.Lambda.Template.Sqs", "src\Kralizek.Lambda.Template.Sqs\Kralizek.Lambda.Template.Sqs.csproj", "{282BE189-9DE6-4A03-B5A4-ADBF4C45D21B}"
2424
EndProject
25+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SnsEventFunctionWithParallelism", "samples\SnsEventFunctionWithParallelism\SnsEventFunctionWithParallelism.csproj", "{3E986948-21D5-4035-AE12-F669FA488D17}"
26+
EndProject
2527
Global
2628
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2729
Debug|Any CPU = Debug|Any CPU
@@ -56,6 +58,10 @@ Global
5658
{282BE189-9DE6-4A03-B5A4-ADBF4C45D21B}.Debug|Any CPU.Build.0 = Debug|Any CPU
5759
{282BE189-9DE6-4A03-B5A4-ADBF4C45D21B}.Release|Any CPU.ActiveCfg = Release|Any CPU
5860
{282BE189-9DE6-4A03-B5A4-ADBF4C45D21B}.Release|Any CPU.Build.0 = Release|Any CPU
61+
{3E986948-21D5-4035-AE12-F669FA488D17}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
62+
{3E986948-21D5-4035-AE12-F669FA488D17}.Debug|Any CPU.Build.0 = Debug|Any CPU
63+
{3E986948-21D5-4035-AE12-F669FA488D17}.Release|Any CPU.ActiveCfg = Release|Any CPU
64+
{3E986948-21D5-4035-AE12-F669FA488D17}.Release|Any CPU.Build.0 = Release|Any CPU
5965
EndGlobalSection
6066
GlobalSection(SolutionProperties) = preSolution
6167
HideSolutionNode = FALSE
@@ -68,5 +74,9 @@ Global
6874
{8DFE9840-FEF4-4E39-89FD-80E968E3C057} = {42BE6308-5AC6-40B3-96DA-1FF015AF533E}
6975
{08E76E38-D49D-4C15-8349-FC23EB37F0DF} = {B9F8C8E0-047D-4F38-8F7D-E64F39E51988}
7076
{282BE189-9DE6-4A03-B5A4-ADBF4C45D21B} = {42BE6308-5AC6-40B3-96DA-1FF015AF533E}
77+
{3E986948-21D5-4035-AE12-F669FA488D17} = {B9F8C8E0-047D-4F38-8F7D-E64F39E51988}
78+
EndGlobalSection
79+
GlobalSection(ExtensibilityGlobals) = postSolution
80+
SolutionGuid = {CE8B1DF0-D1A8-43E0-A6D5-305B4EF727B5}
7181
EndGlobalSection
7282
EndGlobal
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
using Amazon.Lambda.Core;
5+
using Amazon.Lambda.SNSEvents;
6+
using Kralizek.Lambda;
7+
using Microsoft.Extensions.Configuration;
8+
using Microsoft.Extensions.DependencyInjection;
9+
using Microsoft.Extensions.Logging;
10+
11+
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
12+
13+
namespace SnsEventFunction
14+
{
15+
public class Function : EventFunction<SNSEvent>
16+
{
17+
protected override void Configure(IConfigurationBuilder builder)
18+
{
19+
builder.AddEnvironmentVariables();
20+
}
21+
22+
protected override void ConfigureLogging(ILoggingBuilder logging, IExecutionEnvironment executionEnvironment)
23+
{
24+
logging.AddConfiguration(Configuration.GetSection("Logging"));
25+
26+
logging.AddLambdaLogger(new LambdaLoggerOptions
27+
{
28+
IncludeCategory = true,
29+
IncludeLogLevel = true,
30+
IncludeNewline = true
31+
});
32+
}
33+
34+
protected override void ConfigureServices(IServiceCollection services, IExecutionEnvironment executionEnvironment)
35+
{
36+
services.ConfigureSnsParallelExecution(4);
37+
38+
services.UseNotificationHandler<CustomNotification, CustomNotificationHandler>(enableParallelExecution: true);
39+
}
40+
}
41+
42+
public class CustomNotification
43+
{
44+
public string Message { get; set; }
45+
}
46+
47+
public class CustomNotificationHandler : INotificationHandler<CustomNotification>
48+
{
49+
private readonly ILogger<CustomNotificationHandler> _logger;
50+
51+
public CustomNotificationHandler(ILogger<CustomNotificationHandler> logger)
52+
{
53+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
54+
}
55+
56+
public Task HandleAsync(CustomNotification notification, ILambdaContext context)
57+
{
58+
_logger.LogInformation($"Handling notification: {notification.Message}");
59+
60+
return Task.CompletedTask;
61+
}
62+
}
63+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netcoreapp3.1</TargetFramework>
5+
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
6+
<AWSProjectType>Lambda</AWSProjectType>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="Amazon.Lambda.Logging.AspNetCore" Version="2.1.0" />
11+
<PackageReference Include="Microsoft.Extensions.Logging.Configuration" Version="2.1.1" />
12+
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.1.1" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<ProjectReference Include="..\..\src\Kralizek.Lambda.Template.Sns\Kralizek.Lambda.Template.Sns.csproj" />
17+
</ItemGroup>
18+
19+
</Project>
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"Information" : [
3+
"This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.",
4+
"To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.",
5+
6+
"dotnet lambda help",
7+
8+
"All the command line options for the Lambda command can be specified in this file."
9+
],
10+
11+
"profile":"RG",
12+
"region" : "eu-west-1",
13+
"configuration" : "Release",
14+
"framework": "netcoreapp2.1",
15+
"function-runtime": "dotnetcore3.1",
16+
"function-name": "test-lambda-template",
17+
"function-memory-size" : 256,
18+
"function-timeout" : 30,
19+
"function-handler" : "SnsEventFunction::SnsEventFunction.Function::FunctionHandler"
20+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using System;
2+
using System.Linq;
3+
using System.Text.Json;
4+
using System.Threading.Tasks;
5+
using Amazon.Lambda.Core;
6+
using Amazon.Lambda.SNSEvents;
7+
using Microsoft.Extensions.DependencyInjection;
8+
using Microsoft.Extensions.Logging;
9+
using Microsoft.Extensions.Options;
10+
11+
namespace Kralizek.Lambda
12+
{
13+
public class ParallelSnsExecutionOptions
14+
{
15+
public int MaxDegreeOfParallelism { get; set; } = System.Environment.ProcessorCount;
16+
}
17+
18+
public class ParallelSnsEventHandler<TNotification>: IEventHandler<SNSEvent> where TNotification : class
19+
{
20+
private readonly ILogger _logger;
21+
private readonly IServiceProvider _serviceProvider;
22+
private readonly ParallelSnsExecutionOptions _options;
23+
24+
public ParallelSnsEventHandler(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ParallelSnsExecutionOptions> options)
25+
{
26+
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
27+
_logger = loggerFactory?.CreateLogger("SnsForEachAsyncEventHandler") ?? throw new ArgumentNullException(nameof(loggerFactory));
28+
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
29+
}
30+
31+
public async Task HandleAsync(SNSEvent input, ILambdaContext context)
32+
{
33+
if (input.Records.Any())
34+
{
35+
await input.Records.ForEachAsync(_options.MaxDegreeOfParallelism, async record =>
36+
{
37+
using (var scope = _serviceProvider.CreateScope())
38+
{
39+
var message = record.Sns.Message;
40+
var notification = JsonSerializer.Deserialize<TNotification>(message);
41+
_logger.LogDebug($"Message received: {message}");
42+
43+
var messageHandler = scope.ServiceProvider.GetService<INotificationHandler<TNotification>>();
44+
if (messageHandler == null)
45+
{
46+
_logger.LogCritical($"No INotificationHandler<{typeof(TNotification).Name}> could be found.");
47+
throw new InvalidOperationException($"No INotificationHandler<{typeof(TNotification).Name}> could be found.");
48+
}
49+
50+
await messageHandler.HandleAsync(notification, context);
51+
}
52+
});
53+
}
54+
}
55+
}
56+
}
Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,36 @@
1+
using System;
12
using Amazon.Lambda.SNSEvents;
23
using Microsoft.Extensions.DependencyInjection;
34

45
namespace Kralizek.Lambda
56
{
67
public static class ServiceCollectionExtensions
78
{
8-
public static IServiceCollection UseNotificationHandler<TNotification, THandler>(this IServiceCollection services)
9+
public static IServiceCollection ConfigureSnsParallelExecution(this IServiceCollection services, int maxDegreeOfParallelism)
10+
{
11+
services.Configure<ParallelSnsExecutionOptions>(option => option.MaxDegreeOfParallelism = maxDegreeOfParallelism);
12+
13+
return services;
14+
}
15+
16+
public static IServiceCollection UseNotificationHandler<TNotification, THandler>(this IServiceCollection services, bool enableParallelExecution = false)
917
where TNotification : class
1018
where THandler : class, INotificationHandler<TNotification>
1119
{
12-
services.AddTransient<IEventHandler<SNSEvent>, SnsEventHandler<TNotification>>();
20+
services.AddOptions();
21+
22+
if (enableParallelExecution)
23+
{
24+
services.AddTransient<IEventHandler<SNSEvent>, ParallelSnsEventHandler<TNotification>>();
25+
}
26+
else
27+
{
28+
services.AddTransient<IEventHandler<SNSEvent>, SnsEventHandler<TNotification>>();
29+
}
1330

1431
services.AddTransient<INotificationHandler<TNotification>, THandler>();
1532

16-
return services;
33+
return services;
1734
}
1835
}
1936
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using System;
2+
using System.Linq;
3+
using System.Text.Json;
4+
using System.Threading.Tasks;
5+
using Amazon.Lambda.Core;
6+
using Amazon.Lambda.SQSEvents;
7+
using Microsoft.Extensions.DependencyInjection;
8+
using Microsoft.Extensions.Logging;
9+
using Microsoft.Extensions.Options;
10+
11+
namespace Kralizek.Lambda
12+
{
13+
public class ParallelSqsExecutionOptions
14+
{
15+
public int MaxDegreeOfParallelism { get; set; } = System.Environment.ProcessorCount;
16+
}
17+
18+
public class ParallelSqsEventHandler<TMessage>: IEventHandler<SQSEvent> where TMessage : class
19+
{
20+
private readonly ILogger _logger;
21+
private readonly IServiceProvider _serviceProvider;
22+
private readonly ParallelSqsExecutionOptions _options;
23+
24+
public ParallelSqsEventHandler(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ParallelSqsExecutionOptions> options)
25+
{
26+
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
27+
_logger = loggerFactory?.CreateLogger("SqsForEachAsyncEventHandler") ?? throw new ArgumentNullException(nameof(loggerFactory));
28+
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
29+
}
30+
31+
public async Task HandleAsync(SQSEvent input, ILambdaContext context)
32+
{
33+
if (input.Records.Any())
34+
{
35+
36+
await input.Records.ForEachAsync(_options.MaxDegreeOfParallelism, async singleSqsMessage =>
37+
{
38+
using (var scope = _serviceProvider.CreateScope())
39+
{
40+
var sqsMessage = singleSqsMessage.Body;
41+
_logger.LogDebug($"Message received: {sqsMessage}");
42+
43+
var message = JsonSerializer.Deserialize<TMessage>(sqsMessage);
44+
45+
var messageHandler = scope.ServiceProvider.GetService<IMessageHandler<TMessage>>();
46+
if (messageHandler == null)
47+
{
48+
_logger.LogError($"No IMessageHandler<{typeof(TMessage).Name}> could be found.");
49+
throw new InvalidOperationException($"No IMessageHandler<{typeof(TMessage).Name}> could be found.");
50+
}
51+
52+
await messageHandler.HandleAsync(message, context);
53+
}
54+
});
55+
}
56+
}
57+
}
58+
}

src/Kralizek.Lambda.Template.Sqs/ServiceCollectionExtensions.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,27 @@ namespace Kralizek.Lambda
55
{
66
public static class ServiceCollectionExtensions
77
{
8-
public static IServiceCollection UseSqsHandler<TMessage, THandler>(this IServiceCollection services)
8+
public static IServiceCollection ConfigureSnsParallelExecution(this IServiceCollection services, int maxDegreeOfParallelism)
9+
{
10+
services.Configure<ParallelSqsExecutionOptions>(option => option.MaxDegreeOfParallelism = maxDegreeOfParallelism);
11+
12+
return services;
13+
}
14+
15+
public static IServiceCollection UseSqsHandler<TMessage, THandler>(this IServiceCollection services, bool enableParallelExecution = false)
916
where TMessage : class
1017
where THandler : class, IMessageHandler<TMessage>
1118
{
12-
services.AddTransient<IEventHandler<SQSEvent>, SqsEventHandler<TMessage>>();
19+
services.AddOptions();
20+
21+
if (enableParallelExecution)
22+
{
23+
services.AddTransient<IEventHandler<SQSEvent>, ParallelSqsEventHandler<TMessage>>();
24+
}
25+
else
26+
{
27+
services.AddTransient<IEventHandler<SQSEvent>, SqsEventHandler<TMessage>>();
28+
}
1329

1430
services.AddTransient<IMessageHandler<TMessage>, THandler>();
1531

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
7+
namespace Kralizek.Lambda
8+
{
9+
public static class AsyncExtensions
10+
{
11+
/// <summary>
12+
/// Extensions on collection
13+
/// Lambda style extensions to cater a foreach with concurrency.
14+
/// </summary>
15+
/// <typeparam name="T"></typeparam>
16+
/// <param name="source">The collection please make sure the collection can handle the concurrency. If writing back to the objects in the collection</param>
17+
/// <param name="maxDegreeOfParallelism">Concurrent threads doing the async</param>
18+
/// <param name="body">The work that needs to be done.</param>
19+
/// <returns></returns>
20+
public static Task ForEachAsync<T>(this IEnumerable<T> source, int maxDegreeOfParallelism, Func<T, Task> body)
21+
{
22+
return Task.WhenAll(
23+
from partition in Partitioner.Create(source).GetPartitions(maxDegreeOfParallelism)
24+
select Task.Run(async delegate
25+
{
26+
using (partition)
27+
while (partition.MoveNext())
28+
await body(partition.Current);
29+
}));
30+
}
31+
}
32+
}

0 commit comments

Comments
 (0)