Skip to content

Fix run_before_script() output capturing #959

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ $ pipx install --suffix=@next 'tmuxp' --pip-args '\--pre' --force

<!-- Maintainers, insert changes / features for the next release here -->

### Bug fixes

- `run_before_script()`: Fix output issue (#959)

## tmuxp 1.52.0 (2025-02-02)

_Maintenance only, no bug fixes or new features_
Expand Down
91 changes: 61 additions & 30 deletions src/tmuxp/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,72 @@ def run_before_script(
script_file: str | pathlib.Path,
cwd: pathlib.Path | None = None,
) -> int:
"""Execute a shell script, wraps :meth:`subprocess.check_call()` in a try/catch."""
"""Execute shell script, ``tee``-ing output to both terminal (if TTY) and buffer."""
script_cmd = shlex.split(str(script_file))

try:
proc = subprocess.Popen(
shlex.split(str(script_file)),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
script_cmd,
cwd=cwd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True, # decode to str
errors="backslashreplace",
encoding="utf-8",
)
if proc.stdout is not None:
for line in iter(proc.stdout.readline, ""):
sys.stdout.write(line)
proc.wait()

if proc.returncode and proc.stderr is not None:
stderr = proc.stderr.read()
proc.stderr.close()
stderr_strlist = stderr.split("\n")
stderr_str = "\n".join(list(filter(None, stderr_strlist))) # filter empty

raise exc.BeforeLoadScriptError(
proc.returncode,
os.path.abspath(script_file), # NOQA: PTH100
stderr_str,
)
except OSError as e:
if e.errno == 2:
raise exc.BeforeLoadScriptNotExists(
e,
os.path.abspath(script_file), # NOQA: PTH100
) from e
raise
return proc.returncode
except FileNotFoundError as e:
raise exc.BeforeLoadScriptNotExists(
e,
os.path.abspath(script_file), # NOQA: PTH100
) from e

out_buffer = []
err_buffer = []

# While process is running, read lines from stdout/stderr
# and write them to this process's stdout/stderr if isatty
is_out_tty = sys.stdout.isatty()
is_err_tty = sys.stderr.isatty()

# You can do a simple loop reading in real-time:
while True:
# Use .poll() to check if the child has exited
return_code = proc.poll()

# Read one line from stdout, if available
line_out = proc.stdout.readline() if proc.stdout else ""

# Read one line from stderr, if available
line_err = proc.stderr.readline() if proc.stderr else ""

if line_out:
out_buffer.append(line_out)
if is_out_tty:
sys.stdout.write(line_out)
sys.stdout.flush()

if line_err:
err_buffer.append(line_err)
if is_err_tty:
sys.stderr.write(line_err)
sys.stderr.flush()

# If no more data from pipes and process ended, break
if not line_out and not line_err and return_code is not None:
break

# At this point, the process has finished
return_code = proc.wait()

if return_code != 0:
# Join captured stderr lines for your exception
stderr_str = "".join(err_buffer).strip()
raise exc.BeforeLoadScriptError(
return_code,
os.path.abspath(script_file), # NOQA: PTH100
stderr_str,
)

return return_code


def oh_my_zsh_auto_title() -> None:
Expand Down
55 changes: 54 additions & 1 deletion tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import pathlib
import sys
import typing as t

import pytest
Expand Down Expand Up @@ -35,8 +37,59 @@ def test_run_before_script_raise_BeforeLoadScriptError_if_retcode() -> None:
run_before_script(script_file)


def test_return_stdout_if_ok(capsys: pytest.CaptureFixture[str]) -> None:
@pytest.fixture
def temp_script(tmp_path: pathlib.Path) -> pathlib.Path:
"""Fixture of an example script that prints "Hello, world!"."""
script = tmp_path / "test_script.sh"
script.write_text(
"""#!/bin/sh
echo "Hello, World!"
exit 0
"""
)
script.chmod(0o755)
return script


@pytest.mark.parametrize(
["isatty_value", "expected_output"],
[
(True, "Hello, World!"), # if stdout is a TTY, output should be passed through
(False, ""), # if not a TTY, output is not written to sys.stdout
],
)
def test_run_before_script_isatty(
temp_script: pathlib.Path,
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
isatty_value: bool,
expected_output: str,
) -> None:
"""Verify behavior of ``isatty()``, which we mock in `run_before_script()`."""
# Mock sys.stdout.isatty() to return the desired value.
monkeypatch.setattr(sys.stdout, "isatty", lambda: isatty_value)

# Run the script.
returncode = run_before_script(temp_script)

# Assert that the script ran successfully.
assert returncode == 0

out, _err = capsys.readouterr()

# In TTY mode, we expect the output; in non-TTY mode, we expect it to be suppressed.
assert expected_output in out


def test_return_stdout_if_ok(
capsys: pytest.CaptureFixture[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""run_before_script() returns stdout if script succeeds."""
# Simulate sys.stdout.isatty() + sys.stderr.isatty()
monkeypatch.setattr(sys.stdout, "isatty", lambda: True)
monkeypatch.setattr(sys.stderr, "isatty", lambda: True)

script_file = FIXTURE_PATH / "script_complete.sh"

run_before_script(script_file)
Expand Down