diff --git a/nipype/pipeline/plugins/linear.py b/nipype/pipeline/plugins/linear.py index 9863c0f6c6..41f5c998fe 100644 --- a/nipype/pipeline/plugins/linear.py +++ b/nipype/pipeline/plugins/linear.py @@ -36,16 +36,15 @@ def run(self, graph, config, updatehash=False): donotrun = [] nodes, _ = topological_sort(graph) for node in nodes: + endstatus = 'end' try: if node in donotrun: continue if self._status_callback: self._status_callback(node, 'start') node.run(updatehash=updatehash) - if self._status_callback: - self._status_callback(node, 'end') except: - os.chdir(old_wd) + endstatus = 'exception' # bare except, but i really don't know where a # node might fail crashfile = report_crash(node) @@ -53,9 +52,17 @@ def run(self, graph, config, updatehash=False): raise # remove dependencies from queue subnodes = [s for s in dfs_preorder(graph, node)] - notrun.append( - dict(node=node, dependents=subnodes, crashfile=crashfile)) + notrun.append({'node': node, 'dependents': subnodes, + 'crashfile': crashfile}) donotrun.extend(subnodes) + # Delay raising the crash until we cleaned the house + if str2bool(config['execution']['stop_on_first_crash']): + os.chdir(old_wd) # Return wherever we were before + report_nodes_not_run(notrun) # report before raising + raise + finally: if self._status_callback: - self._status_callback(node, 'exception') + self._status_callback(node, endstatus) + + os.chdir(old_wd) # Return wherever we were before report_nodes_not_run(notrun) diff --git a/nipype/pipeline/plugins/tests/test_callback.py b/nipype/pipeline/plugins/tests/test_callback.py index 29c5cbd404..6b9525071e 100644 --- a/nipype/pipeline/plugins/tests/test_callback.py +++ b/nipype/pipeline/plugins/tests/test_callback.py @@ -6,8 +6,8 @@ from builtins import object +from time import sleep import pytest -import sys import nipype.interfaces.utility as niu import nipype.pipeline.engine as pe @@ -25,10 +25,11 @@ def __init__(self): self.statuses = [] def callback(self, node, status, result=None): - self.statuses.append((node, status)) + self.statuses.append((node.name, status)) -def test_callback_normal(tmpdir): +@pytest.mark.parametrize("plugin", ['Linear', 'MultiProc', 'LegacyMultiProc']) +def test_callback_normal(tmpdir, plugin): tmpdir.chdir() so = Status() @@ -37,16 +38,17 @@ def test_callback_normal(tmpdir): niu.Function(function=func, input_names=[], output_names=[]), name='f_node') wf.add_nodes([f_node]) - wf.config['execution'] = {'crashdump_dir': wf.base_dir} - wf.run(plugin="Linear", plugin_args={'status_callback': so.callback}) - assert len(so.statuses) == 2 - for (n, s) in so.statuses: - assert n.name == 'f_node' - assert so.statuses[0][1] == 'start' - assert so.statuses[1][1] == 'end' + wf.config['execution'] = { + 'crashdump_dir': wf.base_dir, + 'poll_sleep_duration': 2 + } + wf.run(plugin=plugin, plugin_args={'status_callback': so.callback}) + assert so.statuses == [('f_node', 'start'), ('f_node', 'end')] -def test_callback_exception(tmpdir): +@pytest.mark.parametrize("plugin", ['Linear', 'MultiProc', 'LegacyMultiProc']) +@pytest.mark.parametrize("stop_on_first_crash", [False, True]) +def test_callback_exception(tmpdir, plugin, stop_on_first_crash): tmpdir.chdir() so = Status() @@ -55,57 +57,13 @@ def test_callback_exception(tmpdir): niu.Function(function=bad_func, input_names=[], output_names=[]), name='f_node') wf.add_nodes([f_node]) - wf.config['execution'] = {'crashdump_dir': wf.base_dir} - try: - wf.run(plugin="Linear", plugin_args={'status_callback': so.callback}) - except: - pass - assert len(so.statuses) == 2 - for (n, s) in so.statuses: - assert n.name == 'f_node' - assert so.statuses[0][1] == 'start' - assert so.statuses[1][1] == 'exception' - - -def test_callback_multiproc_normal(tmpdir): - tmpdir.chdir() - - so = Status() - wf = pe.Workflow(name='test', base_dir=tmpdir.strpath) - f_node = pe.Node( - niu.Function(function=func, input_names=[], output_names=[]), - name='f_node') - wf.add_nodes([f_node]) - wf.config['execution']['crashdump_dir'] = wf.base_dir - wf.config['execution']['poll_sleep_duration'] = 2 - wf.run(plugin='MultiProc', plugin_args={'status_callback': so.callback}) - assert len(so.statuses) == 2 - for (n, s) in so.statuses: - assert n.name == 'f_node' - assert so.statuses[0][1] == 'start' - assert so.statuses[1][1] == 'end' - - -def test_callback_multiproc_exception(tmpdir): - tmpdir.chdir() - - so = Status() - wf = pe.Workflow(name='test', base_dir=tmpdir.strpath) - f_node = pe.Node( - niu.Function(function=bad_func, input_names=[], output_names=[]), - name='f_node') - wf.add_nodes([f_node]) - wf.config['execution'] = {'crashdump_dir': wf.base_dir} - - try: - wf.run( - plugin='MultiProc', plugin_args={ - 'status_callback': so.callback - }) - except: - pass - assert len(so.statuses) == 2 - for (n, s) in so.statuses: - assert n.name == 'f_node' - assert so.statuses[0][1] == 'start' - assert so.statuses[1][1] == 'exception' + wf.config['execution'] = { + 'crashdump_dir': wf.base_dir, + 'stop_on_first_crash': stop_on_first_crash, + 'poll_sleep_duration': 2 + } + with pytest.raises(Exception): + wf.run(plugin=plugin, plugin_args={'status_callback': so.callback}) + + sleep(0.5) # Wait for callback to be called (python 2.7) + assert so.statuses == [('f_node', 'start'), ('f_node', 'exception')]