Skip to content

Feat/pre middelware syncrunner #1341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/WorkflowCore/Services/SyncWorkflowRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ public class SyncWorkflowRunner : ISyncWorkflowRunner
private readonly IExecutionPointerFactory _pointerFactory;
private readonly IQueueProvider _queueService;
private readonly IDateTimeProvider _dateTimeProvider;
private readonly IWorkflowMiddlewareRunner _middlewareRunner;

public SyncWorkflowRunner(IWorkflowHost host, IWorkflowExecutor executor, IDistributedLockProvider lockService, IWorkflowRegistry registry, IPersistenceProvider persistenceStore, IExecutionPointerFactory pointerFactory, IQueueProvider queueService, IDateTimeProvider dateTimeProvider)
public SyncWorkflowRunner(IWorkflowHost host, IWorkflowExecutor executor, IDistributedLockProvider lockService, IWorkflowRegistry registry, IPersistenceProvider persistenceStore, IExecutionPointerFactory pointerFactory, IQueueProvider queueService, IDateTimeProvider dateTimeProvider, IWorkflowMiddlewareRunner middlewareRunner)
{
_host = host;
_executor = executor;
Expand All @@ -28,6 +29,7 @@ public SyncWorkflowRunner(IWorkflowHost host, IWorkflowExecutor executor, IDistr
_pointerFactory = pointerFactory;
_queueService = queueService;
_dateTimeProvider = dateTimeProvider;
_middlewareRunner = middlewareRunner;
}

public Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data,
Expand Down Expand Up @@ -67,7 +69,10 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
wf.Data = def.DataType.GetConstructor(new Type[0]).Invoke(new object[0]);
}

wf.ExecutionPointers.Add(_pointerFactory.BuildGenesisPointer(def));
var genPointer = _pointerFactory.BuildGenesisPointer(def);
wf.ExecutionPointers.Add(genPointer);

await _middlewareRunner.RunPreMiddleware(wf, def);

var id = Guid.NewGuid().ToString();

Expand Down
177 changes: 177 additions & 0 deletions test/WorkflowCore.UnitTests/Services/SyncWorkflowRunnerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
using FakeItEasy;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FluentAssertions;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using WorkflowCore.Services;
using Xunit;
using System.Threading;

namespace WorkflowCore.UnitTests.Services
{
public class SyncWorkflowRunnerTests
{
protected ISyncWorkflowRunner Subject;
protected IWorkflowHost Host;
protected IPersistenceProvider PersistenceProvider;
protected IWorkflowRegistry Registry;
protected IExecutionResultProcessor ResultProcesser;
protected ILifeCycleEventPublisher EventHub;
protected ICancellationProcessor CancellationProcessor;
protected IServiceProvider ServiceProvider;
protected IScopeProvider ScopeProvider;
protected IDateTimeProvider DateTimeProvider;
protected IStepExecutor StepExecutor;
protected IExecutionPointerFactory PointerFactory;
protected IDistributedLockProvider LockService;
protected IWorkflowMiddlewareRunner MiddlewareRunner;
protected WorkflowOptions Options;
protected WorkflowExecutor Executor;

public SyncWorkflowRunnerTests()
{
Host = A.Fake<IWorkflowHost>();
PersistenceProvider = A.Fake<IPersistenceProvider>();
ServiceProvider = A.Fake<IServiceProvider>();
ScopeProvider = A.Fake<IScopeProvider>();
Registry = A.Fake<IWorkflowRegistry>();
ResultProcesser = A.Fake<IExecutionResultProcessor>();
EventHub = A.Fake<ILifeCycleEventPublisher>();
CancellationProcessor = A.Fake<ICancellationProcessor>();
DateTimeProvider = A.Fake<IDateTimeProvider>();
MiddlewareRunner = A.Fake<IWorkflowMiddlewareRunner>();
StepExecutor = A.Fake<IStepExecutor>();
PointerFactory = new ExecutionPointerFactory();
LockService = A.Fake<IDistributedLockProvider>();

Options = new WorkflowOptions(A.Fake<IServiceCollection>());

var stepExecutionScope = A.Fake<IServiceScope>();
A.CallTo(() => ScopeProvider.CreateScope(A<IStepExecutionContext>._)).Returns(stepExecutionScope);
A.CallTo(() => stepExecutionScope.ServiceProvider).Returns(ServiceProvider);

var scope = A.Fake<IServiceScope>();
var scopeFactory = A.Fake<IServiceScopeFactory>();
A.CallTo(() => ServiceProvider.GetService(typeof(IServiceScopeFactory))).Returns(scopeFactory);
A.CallTo(() => scopeFactory.CreateScope()).Returns(scope);
A.CallTo(() => scope.ServiceProvider).Returns(ServiceProvider);

A.CallTo(() => DateTimeProvider.Now).Returns(DateTime.Now);
A.CallTo(() => DateTimeProvider.UtcNow).Returns(DateTime.UtcNow);

A
.CallTo(() => ServiceProvider.GetService(typeof(IWorkflowMiddlewareRunner)))
.Returns(MiddlewareRunner);

A
.CallTo(() => ServiceProvider.GetService(typeof(IStepExecutor)))
.Returns(StepExecutor);

A.CallTo(() => MiddlewareRunner
.RunPostMiddleware(A<WorkflowInstance>._, A<WorkflowDefinition>._))
.Returns(Task.CompletedTask);

A.CallTo(() => MiddlewareRunner
.RunExecuteMiddleware(A<WorkflowInstance>._, A<WorkflowDefinition>._))
.Returns(Task.CompletedTask);

A.CallTo(() => StepExecutor.ExecuteStep(A<IStepExecutionContext>._, A<IStepBody>._))
.ReturnsLazily(call =>
call.Arguments[1].As<IStepBody>().RunAsync(
call.Arguments[0].As<IStepExecutionContext>()));

A.CallTo(() => PersistenceProvider.CreateNewWorkflow(A<WorkflowInstance>.Ignored, A<CancellationToken>.Ignored)).Returns(Guid.NewGuid().ToString());

A.CallTo(() => LockService.AcquireLock(A<string>._, A<CancellationToken>._)).Returns(true);

//config logging
var loggerFactory = new LoggerFactory();
//loggerFactory.AddConsole(LogLevel.Debug);

Executor = new WorkflowExecutor(Registry, ServiceProvider, ScopeProvider, DateTimeProvider, ResultProcesser, EventHub, CancellationProcessor, Options, loggerFactory);
Subject = new SyncWorkflowRunner(Host, Executor, LockService, Registry, PersistenceProvider, PointerFactory, A.Fake<IQueueProvider>(), DateTimeProvider, MiddlewareRunner);
}

[Fact(DisplayName = "Should run pre-middlewares for sync workflows")]
public async Task should_run_pre_middlewares()
{
//arrange
var step1Body = A.Fake<IStepBody>();
A.CallTo(() => step1Body.RunAsync(A<IStepExecutionContext>.Ignored)).Returns(ExecutionResult.Next());
WorkflowStep step1 = BuildFakeStep(step1Body);
Given1StepWorkflow(step1, "Workflow", 1);

//act
await Subject.RunWorkflowSync("Workflow", 1, new { }, "Test", TimeSpan.FromMilliseconds(1));

//assert
A.CallTo(() => MiddlewareRunner.RunPreMiddleware(A<WorkflowInstance>.Ignored, A<WorkflowDefinition>.Ignored)).MustHaveHappened();
}

private void Given1StepWorkflow(WorkflowStep step1, string id, int version)
{
A.CallTo(() => Registry.GetDefinition(id, version)).Returns(new WorkflowDefinition
{
Id = id,
Version = version,
DataType = typeof(object),
Steps = new WorkflowStepCollection
{
step1
}
});
}

private WorkflowStep BuildFakeStep(IStepBody stepBody)
{
return BuildFakeStep(stepBody, new List<IStepParameter>(), new List<IStepParameter>());
}

private WorkflowStep BuildFakeStep(IStepBody stepBody, List<IStepParameter> inputs, List<IStepParameter> outputs)
{
var result = A.Fake<WorkflowStep>();
A.CallTo(() => result.Id).Returns(0);
A.CallTo(() => result.BodyType).Returns(stepBody.GetType());
A.CallTo(() => result.ResumeChildrenAfterCompensation).Returns(true);
A.CallTo(() => result.RevertChildrenAfterCompensation).Returns(false);
A.CallTo(() => result.ConstructBody(ServiceProvider)).Returns(stepBody);
A.CallTo(() => result.Inputs).Returns(inputs);
A.CallTo(() => result.Outputs).Returns(outputs);
A.CallTo(() => result.Outcomes).Returns(new List<IStepOutcome>());
A.CallTo(() => result.InitForExecution(A<WorkflowExecutorResult>.Ignored, A<WorkflowDefinition>.Ignored, A<WorkflowInstance>.Ignored, A<ExecutionPointer>.Ignored)).Returns(ExecutionPipelineDirective.Next);
A.CallTo(() => result.BeforeExecute(A<WorkflowExecutorResult>.Ignored, A<IStepExecutionContext>.Ignored, A<ExecutionPointer>.Ignored, A<IStepBody>.Ignored)).Returns(ExecutionPipelineDirective.Next);
return result;
}

public interface IStepWithProperties : IStepBody
{
int Property1 { get; set; }
int Property2 { get; set; }
int Property3 { get; set; }
DataClass Property4 { get; set; }
}

public class DataClass
{
public int Value1 { get; set; }
public int Value2 { get; set; }
public int Value3 { get; set; }
public object Value4 { get; set; }
}

public class DynamicDataClass
{
public Dictionary<string, int> Storage { get; set; } = new Dictionary<string, int>();

public int this[string propertyName]
{
get => Storage[propertyName];
set => Storage[propertyName] = value;
}
}
}
}