Skip to content

Commit b4585d2

Browse files
authored
Be more tolerant of workflow errors, especially when on-error=continue (#263)
* Be more tolerant of workflow errors, especially when on-error=continue * Can return partial results with --on-error=continue * Add tests for partial output after failure.
1 parent 5a134aa commit b4585d2

File tree

9 files changed

+183
-53
lines changed

9 files changed

+183
-53
lines changed

cwltool/factory.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,17 @@
33
from . import workflow
44
import os
55
from .process import Process
6-
from typing import Any, Text, Union
6+
from typing import Any, Text, Union, Tuple
77
from typing import Callable as tCallable
88
import argparse
99

10+
class WorkflowStatus(Exception):
11+
def __init__(self, out, status):
12+
# type: (Dict[Text,Any], Text) -> None
13+
super(WorkflowStatus, self).__init__("Completed %s" % status)
14+
self.out = out
15+
self.status = status
16+
1017
class Callable(object):
1118
def __init__(self, t, factory): # type: (Process, Factory) -> None
1219
self.t = t
@@ -16,13 +23,17 @@ def __call__(self, **kwargs):
1623
# type: (**Any) -> Union[Text, Dict[Text, Text]]
1724
execkwargs = self.factory.execkwargs.copy()
1825
execkwargs["basedir"] = os.getcwd()
19-
return self.factory.executor(self.t, kwargs, **execkwargs)
26+
out, status = self.factory.executor(self.t, kwargs, **execkwargs)
27+
if status != "success":
28+
raise WorkflowStatus(out, status)
29+
else:
30+
return out
2031

2132
class Factory(object):
2233
def __init__(self, makeTool=workflow.defaultMakeTool,
2334
executor=main.single_job_executor,
2435
**execkwargs):
25-
# type: (tCallable[[Dict[Text, Any], Any], Process],tCallable[...,Union[Text,Dict[Text,Text]]], **Any) -> None
36+
# type: (tCallable[[Dict[Text, Any], Any], Process],tCallable[...,Tuple[Dict[Text,Any], Text]], **Any) -> None
2637
self.makeTool = makeTool
2738
self.executor = executor
2839
self.execkwargs = execkwargs

cwltool/job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ def linkoutdir(src, tgt):
314314
if processStatus != "success":
315315
_logger.warn(u"[job %s] completed %s", self.name, processStatus)
316316
else:
317-
_logger.debug(u"[job %s] completed %s", self.name, processStatus)
317+
_logger.info(u"[job %s] completed %s", self.name, processStatus)
318318

319319
if _logger.isEnabledFor(logging.DEBUG):
320320
_logger.debug(u"[job %s] %s", self.name, json.dumps(outputs, indent=4))

cwltool/main.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
165165
help="Will be passed to `docker run` as the '--net' "
166166
"parameter. Implies '--enable-net'.")
167167

168-
parser.add_argument("--on-error", type=Text,
168+
parser.add_argument("--on-error", type=str,
169169
help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
170-
"Default is 'stop.", default="stop")
170+
"Default is 'stop'.", default="stop", choices=("stop", "continue"))
171171

172172
exgroup = parser.add_mutually_exclusive_group()
173173
exgroup.add_argument("--compute-checksum", action="store_true", default=True,
@@ -187,16 +187,12 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
187187

188188

189189
def single_job_executor(t, job_order_object, **kwargs):
190-
# type: (Process, Dict[Text, Any], **Any) -> Union[Text, Dict[Text, Text]]
190+
# type: (Process, Dict[Text, Any], **Any) -> Tuple[Dict[Text, Any], Text]
191191
final_output = []
192192
final_status = []
193193

194194
def output_callback(out, processStatus):
195195
final_status.append(processStatus)
196-
if processStatus == "success":
197-
_logger.info(u"Final process status is %s", processStatus)
198-
else:
199-
_logger.warn(u"Final process status is %s", processStatus)
200196
final_output.append(out)
201197

202198
if "basedir" not in kwargs:
@@ -223,30 +219,30 @@ def output_callback(out, processStatus):
223219

224220
try:
225221
for r in jobiter:
226-
if r.outdir:
227-
output_dirs.add(r.outdir)
228-
229222
if r:
223+
if r.outdir:
224+
output_dirs.add(r.outdir)
230225
r.run(**kwargs)
231226
else:
232-
raise WorkflowException("Workflow cannot make any more progress.")
227+
_logger.error("Workflow cannot make any more progress.")
228+
break
233229
except WorkflowException:
234230
raise
235231
except Exception as e:
236232
_logger.exception("Got workflow error")
237233
raise WorkflowException(Text(e))
238234

239-
if final_status[0] != "success":
240-
raise WorkflowException(u"Process status is %s" % (final_status))
241-
242-
if final_output[0] and finaloutdir:
235+
if final_output and final_output[0] and finaloutdir:
243236
final_output[0] = relocateOutputs(final_output[0], finaloutdir,
244237
output_dirs, kwargs.get("move_outputs"))
245238

246239
if kwargs.get("rm_tmpdir"):
247240
cleanIntermediate(output_dirs)
248241

249-
return final_output[0]
242+
if final_output and final_status:
243+
return (final_output[0], final_status[0])
244+
else:
245+
return (None, "permanentFail")
250246

251247
class FSAction(argparse.Action):
252248
objclass = None # type: Text
@@ -551,7 +547,7 @@ def versionstring():
551547

552548
def main(argsl=None, # type: List[str]
553549
args=None, # type: argparse.Namespace
554-
executor=single_job_executor, # type: Callable[..., Union[Text, Dict[Text, Text]]]
550+
executor=single_job_executor, # type: Callable[..., Tuple[Dict[Text, Any], Text]]
555551
makeTool=workflow.defaultMakeTool, # type: Callable[..., Process]
556552
selectResources=None, # type: Callable[[Dict[Text, int]], Dict[Text, int]]
557553
stdin=sys.stdin, # type: IO[Any]
@@ -714,7 +710,7 @@ def main(argsl=None, # type: List[str]
714710
setattr(args, 'basedir', job_order_object[1])
715711
del args.workflow
716712
del args.job_order
717-
out = executor(tool, job_order_object[0],
713+
(out, status) = executor(tool, job_order_object[0],
718714
makeTool=makeTool,
719715
select_resources=selectResources,
720716
make_fs_access=make_fs_access,
@@ -735,8 +731,14 @@ def locToPath(p):
735731
stdout.write(json.dumps(out, indent=4))
736732
stdout.write("\n")
737733
stdout.flush()
738-
else:
734+
735+
if status != "success":
736+
_logger.warn(u"Final process status is %s", status)
739737
return 1
738+
else:
739+
_logger.info(u"Final process status is %s", status)
740+
return 0
741+
740742
except (validate.ValidationException) as exc:
741743
_logger.error(u"Input object failed validation:\n%s", exc,
742744
exc_info=args.debug)

cwltool/workflow.py

Lines changed: 52 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ def _rec_fields(rec): # type: (Dict[Text, Any]) -> Dict[Text, Any]
141141
return False
142142
return True
143143

144-
def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceField):
145-
# type: (Dict[Text, WorkflowStateItem], List[Dict[Text, Any]], bool, bool, Text) -> Dict[Text, Any]
144+
def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceField, incomplete=False):
145+
# type: (Dict[Text, WorkflowStateItem], List[Dict[Text, Any]], bool, bool, Text, bool) -> Dict[Text, Any]
146146
inputobj = {} # type: Dict[Text, Any]
147147
for inp in parms:
148148
iid = inp["id"]
@@ -172,7 +172,7 @@ def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceFiel
172172
raise WorkflowException(
173173
u"Connect source '%s' on parameter '%s' does not "
174174
"exist" % (src, inp["id"]))
175-
else:
175+
elif not incomplete:
176176
return None
177177
elif "default" in inp:
178178
inputobj[iid] = inp["default"]
@@ -225,12 +225,13 @@ def __init__(self, workflow, **kwargs):
225225

226226
def receive_output(self, step, outputparms, jobout, processStatus):
227227
# type: (WorkflowJobStep, List[Dict[Text,Text]], Dict[Text,Text], Text) -> None
228+
228229
for i in outputparms:
229230
if "id" in i:
230231
if i["id"] in jobout:
231232
self.state[i["id"]] = WorkflowStateItem(i, jobout[i["id"]])
232233
else:
233-
_logger.error(u"Output is missing expected field %s" % i["id"])
234+
_logger.error(u"[%s] Output is missing expected field %s", step.name, i["id"])
234235
processStatus = "permanentFail"
235236

236237
if _logger.isEnabledFor(logging.DEBUG):
@@ -240,9 +241,9 @@ def receive_output(self, step, outputparms, jobout, processStatus):
240241
if self.processStatus != "permanentFail":
241242
self.processStatus = processStatus
242243

243-
_logger.warn(u"[%s] completion status is %s", step.name, processStatus)
244+
_logger.warn(u"[%s] completed %s", step.name, processStatus)
244245
else:
245-
_logger.info(u"[%s] completion status is %s", step.name, processStatus)
246+
_logger.info(u"[%s] completed %s", step.name, processStatus)
246247

247248
step.completed = True
248249

@@ -363,37 +364,52 @@ def job(self, joborder, output_callback, **kwargs):
363364
self.state[out["id"]] = None
364365

365366
completed = 0
366-
while completed < len(self.steps) and self.processStatus == "success":
367+
while completed < len(self.steps):
367368
made_progress = False
368369

369370
for step in self.steps:
370371
if kwargs.get("on_error", "stop") == "stop" and self.processStatus != "success":
371372
break
372373

373374
if not step.submitted:
374-
step.iterable = self.try_make_job(step, **kwargs)
375+
try:
376+
step.iterable = self.try_make_job(step, **kwargs)
377+
except WorkflowException as e:
378+
_logger.error(u"[%s] Cannot make job: %s", step.name, e)
379+
_logger.debug("", exc_info=True)
380+
self.processStatus = "permanentFail"
375381

376382
if step.iterable:
377-
for newjob in step.iterable:
378-
if kwargs.get("on_error", "stop") == "stop" and self.processStatus != "success":
379-
break
380-
if newjob:
381-
made_progress = True
382-
yield newjob
383-
else:
384-
break
383+
try:
384+
for newjob in step.iterable:
385+
if kwargs.get("on_error", "stop") == "stop" and self.processStatus != "success":
386+
break
387+
if newjob:
388+
made_progress = True
389+
yield newjob
390+
else:
391+
break
392+
except WorkflowException as e:
393+
_logger.error(u"[%s] Cannot make job: %s", step.name, e)
394+
_logger.debug("", exc_info=True)
395+
self.processStatus = "permanentFail"
385396

386397
completed = sum(1 for s in self.steps if s.completed)
387398

388399
if not made_progress and completed < len(self.steps):
389-
yield None
400+
if self.processStatus != "success":
401+
break
402+
else:
403+
yield None
390404

391405
supportsMultipleInput = bool(self.workflow.get_requirement("MultipleInputFeatureRequirement")[0])
392406

393-
wo = object_from_state(self.state, self.tool["outputs"], True, supportsMultipleInput, "outputSource")
394-
395-
if wo is None:
396-
raise WorkflowException("Output for workflow not available")
407+
try:
408+
wo = object_from_state(self.state, self.tool["outputs"], True, supportsMultipleInput, "outputSource", incomplete=True)
409+
except WorkflowException as e:
410+
_logger.error(u"[%s] Cannot collect workflow output: %s", self.name, e)
411+
wo = {}
412+
self.processStatus = "permanentFail"
397413

398414
_logger.info(u"[%s] outdir is %s", self.name, self.outdir)
399415

@@ -591,17 +607,23 @@ def setTotal(self, total): # type: (int) -> None
591607
def parallel_steps(steps, rc, kwargs): # type: (List[Generator], ReceiveScatterOutput, Dict[str, Any]) -> Generator
592608
while rc.completed < rc.total:
593609
made_progress = False
594-
for step in steps:
610+
for index in xrange(len(steps)):
611+
step = steps[index]
595612
if kwargs.get("on_error", "stop") == "stop" and rc.processStatus != "success":
596613
break
597-
for j in step:
598-
if kwargs.get("on_error", "stop") == "stop" and rc.processStatus != "success":
599-
break
600-
if j:
601-
made_progress = True
602-
yield j
603-
else:
604-
break
614+
try:
615+
for j in step:
616+
if kwargs.get("on_error", "stop") == "stop" and rc.processStatus != "success":
617+
break
618+
if j:
619+
made_progress = True
620+
yield j
621+
else:
622+
break
623+
except WorkflowException as e:
624+
_logger.error(u"Cannot make scatter job: %s", e)
625+
_logger.debug("", exc_info=True)
626+
rc.receive_scatter_output(index, {}, "permanentFail")
605627
if not made_progress and rc.completed < rc.total:
606628
yield None
607629

tests/test_examples.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,29 @@ def test_factory(self):
117117
echo = f.make("tests/echo.cwl")
118118
self.assertEqual(echo(inp="foo"), {"out": "foo\n"})
119119

120+
def test_partial_scatter(self):
121+
f = cwltool.factory.Factory(on_error="continue")
122+
fail = f.make("tests/wf/scatterfail.cwl")
123+
try:
124+
fail()
125+
except cwltool.factory.WorkflowStatus as e:
126+
self.assertEquals('sha1$e5fa44f2b31c1fb553b6021e7360d07d5d91ff5e', e.out["out"][0]["checksum"])
127+
self.assertIsNone(e.out["out"][1])
128+
self.assertEquals('sha1$a3db5c13ff90a36963278c6a39e4ee3c22e2a436', e.out["out"][2]["checksum"])
129+
else:
130+
self.fail("Should have raised WorkflowStatus")
131+
132+
def test_partial_output(self):
133+
f = cwltool.factory.Factory(on_error="continue")
134+
fail = f.make("tests/wf/wffail.cwl")
135+
try:
136+
fail()
137+
except cwltool.factory.WorkflowStatus as e:
138+
self.assertEquals('sha1$e5fa44f2b31c1fb553b6021e7360d07d5d91ff5e', e.out["out1"]["checksum"])
139+
self.assertNotIn("out2", e.out)
140+
else:
141+
self.fail("Should have raised WorkflowStatus")
142+
120143
class TestScanDeps(unittest.TestCase):
121144
def test_scandeps(self):
122145
obj = {

tests/wf/cat.cwl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
class: CommandLineTool
2+
cwlVersion: v1.0
3+
inputs:
4+
r: File
5+
outputs: []
6+
arguments: [cat, $(inputs.r.path)]

tests/wf/echo.cwl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
class: CommandLineTool
2+
cwlVersion: v1.0
3+
inputs:
4+
r: string
5+
script:
6+
type: string
7+
default: |
8+
import sys
9+
print sys.argv[1]
10+
if sys.argv[1] == "2":
11+
exit(1)
12+
else:
13+
f = open("foo"+sys.argv[1]+".txt", "w")
14+
f.write(sys.argv[1]+"\n")
15+
outputs:
16+
out:
17+
type: File
18+
outputBinding:
19+
glob: foo$(inputs.r).txt
20+
arguments: [python, -c, $(inputs.script), $(inputs.r)]

tests/wf/scatterfail.cwl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
class: Workflow
2+
cwlVersion: v1.0
3+
requirements:
4+
ScatterFeatureRequirement: {}
5+
inputs:
6+
range:
7+
type: string[]
8+
default: ["1", "2", "3"]
9+
outputs:
10+
out:
11+
type: File[]
12+
outputSource: step1/out
13+
steps:
14+
step1:
15+
in:
16+
r: range
17+
scatter: r
18+
out: [out]
19+
run: echo.cwl
20+
step2:
21+
in:
22+
r: step1/out
23+
scatter: r
24+
out: []
25+
run: cat.cwl

0 commit comments

Comments
 (0)