Skip to content

Commit a47de61

Browse files
authored
IWF-686: Standardize unit test implementation (#102)
* IWF-686: Standardize unit test implementation * IWF-686: Refactor
1 parent f06c9de commit a47de61

37 files changed

+1607
-1513
lines changed

iwf/tests/__init__.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from iwf.registry import Registry
2+
from iwf.tests.workflows.abnormal_exit_workflow import AbnormalExitWorkflow
3+
from iwf.tests.workflows.basic_workflow import BasicWorkflow
4+
from iwf.tests.workflows.conditional_complete_workflow import (
5+
ConditionalCompleteWorkflow,
6+
)
7+
from iwf.tests.workflows.describe_workflow import DescribeWorkflow
8+
from iwf.tests.workflows.internal_channel_workflow import InternalChannelWorkflow
9+
from iwf.tests.workflows.internal_channel_workflow_with_no_prefix_channel import (
10+
InternalChannelWorkflowWithNoPrefixChannel,
11+
)
12+
from iwf.tests.workflows.persistence_data_attributes_workflow import (
13+
PersistenceDataAttributesWorkflow,
14+
)
15+
from iwf.tests.workflows.persistence_search_attributes_workflow import (
16+
PersistenceSearchAttributesWorkflow,
17+
)
18+
from iwf.tests.workflows.persistence_state_execution_local_workflow import (
19+
PersistenceStateExecutionLocalWorkflow,
20+
)
21+
from iwf.tests.workflows.recovery_workflow import RecoveryWorkflow
22+
from iwf.tests.workflows.rpc_workflow import RPCWorkflow
23+
from iwf.tests.workflows.state_options_override_workflow import (
24+
StateOptionsOverrideWorkflow,
25+
)
26+
from iwf.tests.workflows.timer_workflow import TimerWorkflow
27+
from iwf.tests.workflows.wait_for_state_with_state_execution_id_workflow import (
28+
WaitForStateWithStateExecutionIdWorkflow,
29+
)
30+
from iwf.tests.workflows.wait_for_state_with_wait_for_key_workflow import (
31+
WaitForStateWithWaitForKeyWorkflow,
32+
)
33+
from iwf.tests.workflows.wait_internal_channel_workflow import (
34+
WaitInternalChannelWorkflow,
35+
)
36+
from iwf.tests.workflows.wait_signal_workflow import WaitSignalWorkflow
37+
38+
registry = Registry()
39+
40+
registry.add_workflow(AbnormalExitWorkflow())
41+
registry.add_workflow(BasicWorkflow())
42+
registry.add_workflow(ConditionalCompleteWorkflow())
43+
registry.add_workflow(DescribeWorkflow())
44+
registry.add_workflow(InternalChannelWorkflow())
45+
registry.add_workflow(InternalChannelWorkflowWithNoPrefixChannel())
46+
registry.add_workflow(PersistenceDataAttributesWorkflow())
47+
registry.add_workflow(PersistenceSearchAttributesWorkflow())
48+
registry.add_workflow(PersistenceStateExecutionLocalWorkflow())
49+
registry.add_workflow(RecoveryWorkflow())
50+
registry.add_workflow(RPCWorkflow())
51+
registry.add_workflow(TimerWorkflow())
52+
registry.add_workflow(StateOptionsOverrideWorkflow())
53+
registry.add_workflow(WaitForStateWithStateExecutionIdWorkflow())
54+
registry.add_workflow(WaitForStateWithWaitForKeyWorkflow())
55+
registry.add_workflow(WaitInternalChannelWorkflow())
56+
registry.add_workflow(WaitSignalWorkflow())
Lines changed: 13 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,34 @@
11
import inspect
22
import time
33
import unittest
4-
from typing import Union
54

65
from iwf.client import Client
7-
from iwf.command_results import CommandResults
8-
from iwf.communication import Communication
96
from iwf.errors import WorkflowFailed
10-
from iwf.iwf_api.models import RetryPolicy
117
from iwf.iwf_api.models.id_reuse_policy import IDReusePolicy
12-
from iwf.persistence import Persistence
13-
from iwf.state_decision import StateDecision
14-
from iwf.state_schema import StateSchema
15-
from iwf.tests.test_basic_workflow import BasicWorkflow
168
from iwf.tests.worker_server import registry
17-
from iwf.workflow import ObjectWorkflow
18-
from iwf.workflow_context import WorkflowContext
9+
from iwf.tests.workflows.abnormal_exit_workflow import AbnormalExitWorkflow
10+
from iwf.tests.workflows.basic_workflow import BasicWorkflow
1911
from iwf.workflow_options import WorkflowOptions
20-
from iwf.workflow_state import T, WorkflowState
21-
from iwf.workflow_state_options import WorkflowStateOptions
22-
23-
24-
class AbnormalExitState1(WorkflowState[Union[int, str]]):
25-
def execute(
26-
self,
27-
ctx: WorkflowContext,
28-
input: T,
29-
command_results: CommandResults,
30-
persistence: Persistence,
31-
communication: Communication,
32-
) -> StateDecision:
33-
raise RuntimeError("abnormal exit state")
34-
35-
def get_state_options(self) -> WorkflowStateOptions:
36-
return WorkflowStateOptions(
37-
execute_api_retry_policy=RetryPolicy(maximum_attempts=1)
38-
)
39-
40-
41-
class AbnormalExitWorkflow(ObjectWorkflow):
42-
def get_workflow_states(self) -> StateSchema:
43-
return StateSchema.with_starting_state(AbnormalExitState1())
44-
45-
46-
abnormal_exit_wf = AbnormalExitWorkflow()
47-
registry.add_workflow(abnormal_exit_wf)
48-
client = Client(registry)
4912

5013

5114
class TestAbnormalWorkflow(unittest.TestCase):
15+
@classmethod
16+
def setUpClass(cls):
17+
cls.client = Client(registry)
18+
5219
def test_abnormal_exit_workflow(self):
5320
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
54-
startOptions = WorkflowOptions(
21+
start_options = WorkflowOptions(
5522
workflow_id_reuse_policy=IDReusePolicy.ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY
5623
)
5724

58-
client.start_workflow(AbnormalExitWorkflow, wf_id, 100, "input", startOptions)
25+
self.client.start_workflow(
26+
AbnormalExitWorkflow, wf_id, 100, "input", start_options
27+
)
5928
with self.assertRaises(WorkflowFailed):
60-
client.get_simple_workflow_result_with_wait(wf_id, str)
29+
self.client.wait_for_workflow_completion(wf_id, str)
6130

6231
# Starting a workflow with the same ID should be allowed since the previous failed abnormally
63-
client.start_workflow(BasicWorkflow, wf_id, 100, "input", startOptions)
64-
res = client.get_simple_workflow_result_with_wait(wf_id, str)
32+
self.client.start_workflow(BasicWorkflow, wf_id, 100, "input", start_options)
33+
res = self.client.wait_for_workflow_completion(wf_id, str)
6534
assert res == "done"

iwf/tests/test_basic_workflow.py

Lines changed: 9 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,20 @@
11
import inspect
22
import time
33
import unittest
4-
from typing import Union
54

65
from iwf.client import Client
7-
from iwf.command_request import CommandRequest, TimerCommand
8-
from iwf.command_results import CommandResults
9-
from iwf.communication import Communication
106
from iwf.errors import WorkflowAlreadyStartedError
117
from iwf.iwf_api.models import WorkflowAlreadyStartedOptions
12-
from iwf.persistence import Persistence
13-
from iwf.state_decision import StateDecision
14-
from iwf.state_schema import StateSchema
158
from iwf.tests.worker_server import registry
16-
from iwf.workflow import ObjectWorkflow
17-
from iwf.workflow_context import WorkflowContext
9+
from iwf.tests.workflows.basic_workflow import BasicWorkflow
1810
from iwf.workflow_options import WorkflowOptions
19-
from iwf.workflow_state import T, WorkflowState
20-
21-
22-
class State1(WorkflowState[Union[int, str]]):
23-
def wait_until(
24-
self,
25-
ctx: WorkflowContext,
26-
input: T,
27-
persistence: Persistence,
28-
communication: Communication,
29-
) -> CommandRequest:
30-
if input != "input":
31-
raise RuntimeError("input is incorrect")
32-
return CommandRequest.for_all_command_completed(
33-
TimerCommand.by_seconds(1),
34-
)
35-
36-
def execute(
37-
self,
38-
ctx: WorkflowContext,
39-
input: T,
40-
command_results: CommandResults,
41-
persistence: Persistence,
42-
communication: Communication,
43-
) -> StateDecision:
44-
if input != "input":
45-
raise RuntimeError("input is incorrect")
46-
return StateDecision.single_next_state(State2)
47-
48-
49-
class State2(WorkflowState[None]):
50-
def execute(
51-
self,
52-
ctx: WorkflowContext,
53-
input: T,
54-
command_results: CommandResults,
55-
persistence: Persistence,
56-
communication: Communication,
57-
) -> StateDecision:
58-
return StateDecision.graceful_complete_workflow("done")
59-
60-
61-
class BasicWorkflow(ObjectWorkflow):
62-
def get_workflow_states(self) -> StateSchema:
63-
return StateSchema.with_starting_state(State1(), State2())
64-
65-
66-
hello_wf = BasicWorkflow()
67-
registry.add_workflow(hello_wf)
68-
client = Client(registry)
6911

7012

7113
class TestWorkflowErrors(unittest.TestCase):
14+
@classmethod
15+
def setUpClass(cls):
16+
cls.client = Client(registry)
17+
7218
def test_basic_workflow(self):
7319
original_request_id = "1"
7420
later_request_id = "2"
@@ -85,12 +31,12 @@ def test_basic_workflow(self):
8531
workflow_already_started_options_1
8632
)
8733

88-
wf_run_id = client.start_workflow(
34+
wf_run_id = self.client.start_workflow(
8935
BasicWorkflow, wf_id, 100, "input", start_options_1
9036
)
9137
assert wf_run_id
9238

93-
wf_run_id = client.start_workflow(
39+
wf_run_id = self.client.start_workflow(
9440
BasicWorkflow, wf_id, 100, "input", start_options_1
9541
)
9642
assert wf_run_id
@@ -106,10 +52,10 @@ def test_basic_workflow(self):
10652
)
10753

10854
with self.assertRaises(WorkflowAlreadyStartedError):
109-
wf_run_id = client.start_workflow(
55+
wf_run_id = self.client.start_workflow(
11056
BasicWorkflow, wf_id, 100, "input", start_options_2
11157
)
11258
assert wf_run_id
11359

114-
res = client.wait_for_workflow_completion(wf_id, str)
60+
res = self.client.wait_for_workflow_completion(wf_id, str)
11561
assert res == "done"

iwf/tests/test_conditional_complete.py

Lines changed: 21 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -3,123 +3,40 @@
33
import unittest
44

55
from iwf.client import Client
6-
from iwf.command_request import (
7-
CommandRequest,
8-
InternalChannelCommand,
9-
SignalChannelCommand,
10-
)
11-
from iwf.command_results import CommandResults
12-
from iwf.communication import Communication
13-
from iwf.communication_schema import CommunicationMethod, CommunicationSchema
14-
from iwf.persistence import Persistence
15-
from iwf.persistence_schema import PersistenceField, PersistenceSchema
16-
from iwf.rpc import rpc
17-
from iwf.state_decision import StateDecision
18-
from iwf.state_schema import StateSchema
196
from iwf.tests.worker_server import registry
20-
from iwf.workflow import ObjectWorkflow
21-
from iwf.workflow_context import WorkflowContext
22-
from iwf.workflow_state import T, WorkflowState
23-
24-
test_signal_channel = "test-1"
25-
test_internal_channel = "test-2"
26-
27-
da_counter = "counter"
28-
29-
30-
class WaitState(WorkflowState[bool]):
31-
def wait_until(
32-
self,
33-
ctx: WorkflowContext,
34-
use_signal: T,
35-
persistence: Persistence,
36-
communication: Communication,
37-
) -> CommandRequest:
38-
if use_signal:
39-
return CommandRequest.for_all_command_completed(
40-
SignalChannelCommand.by_name(test_signal_channel),
41-
)
42-
else:
43-
return CommandRequest.for_all_command_completed(
44-
InternalChannelCommand.by_name(test_internal_channel),
45-
)
46-
47-
def execute(
48-
self,
49-
ctx: WorkflowContext,
50-
use_signal: T,
51-
command_results: CommandResults,
52-
persistence: Persistence,
53-
communication: Communication,
54-
) -> StateDecision:
55-
counter = persistence.get_data_attribute(da_counter)
56-
if counter is None:
57-
counter = 0
58-
counter += 1
59-
persistence.set_data_attribute(da_counter, counter)
60-
61-
if ctx.state_execution_id == "WaitState-1":
62-
# wait for 3 seconds so that the channel can have a new message
63-
time.sleep(3)
64-
if use_signal:
65-
return StateDecision.force_complete_if_signal_channel_empty_or_else(
66-
test_signal_channel, counter, WaitState, use_signal
67-
)
68-
else:
69-
return StateDecision.force_complete_if_internal_channel_empty_or_else(
70-
test_internal_channel, counter, WaitState, use_signal
71-
)
72-
73-
74-
class ConditionalCompleteWorkflow(ObjectWorkflow):
75-
def get_communication_schema(self) -> CommunicationSchema:
76-
return CommunicationSchema.create(
77-
CommunicationMethod.signal_channel_def(test_signal_channel, int),
78-
CommunicationMethod.internal_channel_def(test_internal_channel, int),
79-
)
80-
81-
def get_persistence_schema(self) -> PersistenceSchema:
82-
return PersistenceSchema.create(
83-
PersistenceField.data_attribute_def(da_counter, int),
84-
)
85-
86-
def get_workflow_states(self) -> StateSchema:
87-
return StateSchema.with_starting_state(WaitState())
88-
89-
@rpc()
90-
def test_rpc_publish_channel(self, com: Communication):
91-
com.publish_to_internal_channel(test_internal_channel, 0)
7+
from iwf.tests.workflows.conditional_complete_workflow import (
8+
ConditionalCompleteWorkflow,
9+
test_signal_channel,
10+
)
9211

9312

9413
class TestConditionalComplete(unittest.TestCase):
9514
@classmethod
9615
def setUpClass(cls):
97-
wf = ConditionalCompleteWorkflow()
98-
registry.add_workflow(wf)
16+
cls.client = Client(registry)
9917

10018
def test_internal_channel_conditional_complete(self):
10119
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
102-
self.do_test_conditional_workflow(wf_id, False)
20+
do_test_conditional_workflow(self.client, wf_id, False)
10321

10422
def test_signal_channel_conditional_complete(self):
10523
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
106-
self.do_test_conditional_workflow(wf_id, True)
24+
do_test_conditional_workflow(self.client, wf_id, True)
10725

108-
def do_test_conditional_workflow(self, wf_id: str, use_signal: bool):
109-
self.client = Client(registry)
11026

111-
self.client.start_workflow(ConditionalCompleteWorkflow, wf_id, 10, use_signal)
27+
def do_test_conditional_workflow(client: Client, wf_id: str, use_signal: bool):
28+
client.start_workflow(ConditionalCompleteWorkflow, wf_id, 10, use_signal)
11229

113-
for x in range(3):
114-
if use_signal:
115-
self.client.signal_workflow(wf_id, test_signal_channel, 123)
116-
else:
117-
self.client.invoke_rpc(
118-
wf_id, ConditionalCompleteWorkflow.test_rpc_publish_channel
119-
)
120-
if x == 0:
121-
# wait for a second so that the workflow is in execute state
122-
time.sleep(1)
30+
for x in range(3):
31+
if use_signal:
32+
client.signal_workflow(wf_id, test_signal_channel, 123)
33+
else:
34+
client.invoke_rpc(
35+
wf_id, ConditionalCompleteWorkflow.test_rpc_publish_channel
36+
)
37+
if x == 0:
38+
# wait for a second so that the workflow is in execute state
39+
time.sleep(1)
12340

124-
res = self.client.get_simple_workflow_result_with_wait(wf_id)
125-
assert res == 3
41+
res = client.wait_for_workflow_completion(wf_id)
42+
assert res == 3

0 commit comments

Comments
 (0)