Skip to content

Commit 5d59c49

Browse files
authored
Merge pull request #111 from cokelaer/main
Add ability to introspect slurm files
2 parents cca6c17 + c0301b9 commit 5d59c49

File tree

9 files changed

+173
-37
lines changed

9 files changed

+173
-37
lines changed

README.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
:Overview: A set of tools to help building or using Sequana pipelines
2424
:Status: Production
25-
:Issues: Please fill a report on `github <https://github.com/sequana/sequana/issues>`__
25+
:Issues: Please fill a report on `github <https://github.com/sequana/sequana_pipetools/issues>`__
2626
:Python version: Python 3.8, 3.9, 3.10, 3.11
2727
:Citation: Cokelaer et al, (2017), ‘Sequana’: a Set of Snakemake NGS pipelines, Journal of Open Source Software, 2(16), 352, `JOSS DOI doi:10.21105/joss.00352 <http://www.doi2bib.org/bib/10.21105%2Fjoss.00352>`_
2828

@@ -313,6 +313,7 @@ Changelog
313313
========= ======================================================================
314314
Version Description
315315
========= ======================================================================
316+
1.0.5 * introspect slurm files to extract stats
316317
1.0.4 * add utility function to download and untar a tar.gz file
317318
1.0.3 * add levenshtein function. some typo corrections.
318319
1.0.2 * add the dot2png command. pin docutils <0.21 due to pip error

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
66
#maintainer ?#maintainer email
77
[tool.poetry]
88
name = "sequana_pipetools"
9-
version = "1.0.4"
9+
version = "1.0.5"
1010
description = "A set of tools to help building or using Sequana pipelines"
1111
authors = ["Sequana Team"]
1212
license = "BSD-3"

sequana_pipetools/misc.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def download_and_extract_tar_gz(url, extract_to):
4242
os.makedirs(extract_to, exist_ok=True)
4343

4444
# Download the file
45-
logger.info(f"Downloading {filename}...")
45+
logger.info(f"Downloaded {filename} to {file_path}")
4646
response = requests.get(url, stream=True)
4747
response.raise_for_status() # Raise an exception for HTTP errors
4848
total_size = int(response.headers.get("content-length", 0))
@@ -60,21 +60,15 @@ def download_and_extract_tar_gz(url, extract_to):
6060
file.write(chunk)
6161
bar.update(len(chunk))
6262

63-
logger.info(f"Downloaded {filename} to {file_path}")
64-
6563
# Extract the tar.gz file
6664
if tarfile.is_tarfile(file_path):
67-
logger.info(f"Extracting {filename}...")
68-
6965
with tarfile.open(file_path, "r:gz") as tar:
7066
tar.extractall(path=extract_to)
71-
logger.info(f"Extracted to {extract_to}")
7267
else:
73-
logger.info(f"{file_path} is not a valid tar.gz file.")
68+
logger.warning(f"{file_path} is not a valid tar.gz file.")
7469

7570
# Optionally, you can delete the .tar.gz file after extraction
7671
os.remove(file_path)
77-
logger.info("Process completed.")
7872

7973

8074
def levenshtein_distance(token1: str, token2: str) -> int:

sequana_pipetools/sequana_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ def teardown(self, check_schema=True, check_input_files=True):
417417
msg += "cd {}; sbatch {}.sh\n\n".format(self.workdir, self.name)
418418
else:
419419
msg += "cd {}; sh {}.sh\n\n".format(self.workdir, self.name)
420-
msg += f"You may tune extra parameters related to snakemake in {self.workdir}/{self.name}/.sequana/profile_{self.options.profile}"
420+
msg += f"You may tune extra parameters related to snakemake in {self.workdir}/.sequana/profile_{self.options.profile}"
421421
print(self.colors.purple(msg))
422422

423423
# Save an info.txt with the command used

sequana_pipetools/snaketools/pipeline_manager.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from sequana_pipetools import get_package_version
2020
from sequana_pipetools.misc import PipetoolsException
2121
from sequana_pipetools.snaketools.errors import PipeError
22+
from sequana_pipetools.snaketools.slurm import SlurmStats
2223

2324
from .file_factory import FastQFactory, FileFactory
2425
from .module import Pipeline
@@ -142,8 +143,10 @@ def onerror(self):
142143
try:
143144
p = PipeError(self.name)
144145
p.status()
145-
print(f"\nIf you encoutered an error using sequana_{self.name}, please copy paste the above message and create a New Issue on https://github.com/sequana/{self.name}/issues")
146-
except Exception as err: #pragma: no cover
146+
print(
147+
f"\nIf you encoutered an error using sequana_{self.name}, please copy paste the above message and create a New Issue on https://github.com/sequana/{self.name}/issues"
148+
)
149+
except Exception as err: # pragma: no cover
147150
print
148151

149152
def teardown(self, extra_dirs_to_remove=[], extra_files_to_remove=[], outdir="."):
@@ -154,10 +157,11 @@ def teardown(self, extra_dirs_to_remove=[], extra_files_to_remove=[], outdir="."
154157
cleaner.add_makefile()
155158

156159
# create the version file given the requirements
157-
if os.path.exists(".sequana/tools.txt"):
158-
with open(".sequana/tools.txt", "r") as fin:
160+
161+
if os.path.exists(f"{outdir}/.sequana/tools.txt"):
162+
with open(f"{outdir}/.sequana/tools.txt", "r") as fin:
159163
deps = fin.readlines()
160-
with open(".sequana/versions.txt", "w") as fout:
164+
with open(f"{outdir}/.sequana/versions.txt", "w") as fout:
161165
from versionix.parser import get_version
162166

163167
for dep in deps:
@@ -172,7 +176,16 @@ def teardown(self, extra_dirs_to_remove=[], extra_files_to_remove=[], outdir="."
172176
print("\u2705 Another successful analysis. Open summary.html in your browser. Have fun.")
173177
else:
174178
print("\u2705 Another successful analysis. Have fun.")
175-
print("\u2705 Please consider citing us would you use Sequana in your research. See https://sequana.readthedocs.io or cite: \n\n\tCokelaer et al. Sequana': a Set of Snakemake NGS pipelines, (2007) JOSS 2(16)")
179+
print(
180+
"\u2705 Please consider citing us would you use Sequana in your research. See https://sequana.readthedocs.io or cite: \n\n\tCokelaer et al. Sequana': a Set of Snakemake NGS pipelines, (2007) JOSS 2(16)"
181+
)
182+
183+
# for HPC with slurm only
184+
try:
185+
slurm_stats = SlurmStats(outdir)
186+
slurm_stats.to_csv(f"{outdir}.sequana/slurm_stats.txt")
187+
except Exception:
188+
pass
176189

177190
def get_html_summary(self, float="left", width=30):
178191
import pandas as pd

sequana_pipetools/snaketools/slurm.py

Lines changed: 98 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,111 @@
1010
# Documentation: http://sequana.readthedocs.io
1111
# Contributors: https://github.com/sequana/sequana/graphs/contributors
1212
##############################################################################
13+
import re
14+
import subprocess
1315
from pathlib import Path
1416

1517
import colorlog
1618
import parse
1719

1820
logger = colorlog.getLogger(__name__)
1921

22+
__all__ = ["SlurmStats", "SlurmParsing"]
2023

21-
class SlurmParsing:
24+
25+
class SlurmData:
26+
def __init__(self, working_directory, logs_directory="logs", pattern="*/*slurm*.out"):
27+
28+
# get the master slurm file
29+
main_slurms = list(Path(working_directory).glob("slurm-*"))
30+
31+
try:
32+
self.master = sorted(main_slurms)[-1]
33+
print(f"Found slurm master {self.master}")
34+
except Exception as err:
35+
self.master = None
36+
37+
log_dir = Path(working_directory) / logs_directory
38+
self.slurms = sorted([f for f in log_dir.glob(pattern)])
39+
40+
41+
# not tested because requires sacct command
42+
class SlurmStats(SlurmData): # pragma: nocover
43+
def __init__(self, working_directory, logs_directory="logs", pattern="*/*slurm*.out"):
44+
super(SlurmStats, self).__init__(working_directory, logs_directory, pattern)
45+
46+
results = []
47+
logger.info(f"Introspecting {len(self.slurms)} slurm files")
48+
for filename in self.slurms:
49+
50+
ID = filename.name.split("-slurm-")[-1].replace(".out", "")
51+
task = filename.name.split("-")[0]
52+
53+
# get slurm ID
54+
cmd = f"sacct -j {ID} --format maxRSS,AllocCPUS,Elapsed,CPUTime"
55+
call = subprocess.run(cmd.split(), stdout=subprocess.PIPE)
56+
if call.returncode == 0:
57+
jobinfo = self._parse_sacct_output(call.stdout.decode())
58+
results.append([task] + jobinfo)
59+
else:
60+
print(cmd)
61+
62+
self.results = results
63+
self.columns = ["task", "memory_gb", "thread", "time", "cpu_time"]
64+
65+
def to_csv(self, outfile):
66+
with open(outfile, "w") as fout:
67+
fout.write(",".join(self.columns))
68+
for result in self.results:
69+
fout.write(",".join([str(x) for x in result]))
70+
71+
def _parse_sacct_output(self, output):
72+
"""Function to parse sacct output
73+
74+
The output is suppose to have 4 entries in this order:
75+
MaxRSS AllocCPUS Elapsed CPUTime and solely used by :class:`~SlurmStats`
76+
77+
"""
78+
# Split the output into lines and remove the header
79+
lines = output.strip().split("\n")[2:]
80+
81+
# Initialize a list to store the values of interest
82+
job_info = []
83+
84+
# Regex to match the values
85+
value_regex = re.compile(r"(\S+)?\s+(\d+)\s+(\d{2}:\d{2}:\d{2})\s+(\d{2}:\d{2}:\d{2})")
86+
87+
for i, line in enumerate(lines):
88+
match = value_regex.search(line)
89+
if match:
90+
# Extract values from regex groups
91+
maxrss = match.group(1) if match.group(1) else "0K" # Handle empty MaxRSS case
92+
alloccpus = int(match.group(2))
93+
elapsed = match.group(3)
94+
cputime = match.group(4)
95+
96+
# Only keep the second line (main job)
97+
if i == 1:
98+
# Convert MaxRSS from KB to GB
99+
maxrss_gb = self._kb_to_gb(maxrss)
100+
# Append parsed values to the job_info list
101+
job_info = [maxrss_gb, alloccpus, elapsed, cputime]
102+
break
103+
104+
# Return the list of job information
105+
return job_info
106+
107+
def _kb_to_gb(self, kb_str):
108+
# Remove the 'K' and convert to float
109+
kb = float(kb_str.replace("K", ""))
110+
111+
# Convert kilobytes to gigabytes (1 GB = 1024^2 KB)
112+
gb = kb / (1024**2)
113+
114+
return round(gb, 6) # Round to six decimal places for precision
115+
116+
117+
class SlurmParsing(SlurmData):
22118
"""Helper for sequana jobs debugging on slurm cluster.
23119
24120
Assumptions:
@@ -49,18 +145,7 @@ class SlurmParsing:
49145
}
50146

51147
def __init__(self, working_directory, logs_directory="logs", pattern="*/*slurm*.out"):
52-
53-
# get the master slurm file
54-
main_slurms = list(Path(working_directory).glob("slurm-*"))
55-
56-
try:
57-
self.master = sorted(main_slurms)[-1]
58-
print(f"Found slurm master {self.master}")
59-
except Exception as err:
60-
self.master = None
61-
62-
log_dir = Path(working_directory) / logs_directory
63-
self.slurms = sorted([f for f in log_dir.glob(pattern)])
148+
super(SlurmParsing, self).__init__(working_directory, logs_directory, pattern)
64149

65150
# no sys exit (even zero) since it is used within snakemake
66151
N = len(self.slurms)

tests/scripts/test_main.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import sys
21
import os
2+
import sys
33

44
from click.testing import CliRunner
55

@@ -58,10 +58,8 @@ def test_slurm_diag():
5858
assert results.exit_code == 0
5959

6060

61-
def test_dot2png():
61+
def test_dot2png(tmpdir):
6262
runner = CliRunner()
6363
dotfile = os.path.join(test_dir, "..", "data", "test_dag.dot")
6464
results = runner.invoke(main, ["--dot2png", dotfile])
6565
assert results.exit_code == 0
66-
67-

tests/snaketools/test_pipeline_manager.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,14 @@ def test_pipeline_manager(tmpdir):
2929
pm = snaketools.PipelineManager("custom", cfg)
3030
assert not pm.paired
3131
working_dir = tmpdir.mkdir("temp")
32-
pm.teardown(outdir=working_dir)
32+
33+
# when using the pipelie manager, it does not create .sequana, tools, etc
34+
# this is created by the sequana_pipetools.sequana_manager
35+
(working_dir / ".sequana").mkdir()
36+
ff = Path(working_dir / ".sequana" / "tools.txt")
37+
ff.touch()
38+
ff = Path(working_dir / ".sequana" / "version.txt")
39+
ff.touch()
3340

3441
# here not readtag provided, so data is considered to be non-fastq related
3542
# or at least not paired
@@ -197,7 +204,6 @@ def test_directory():
197204
pm = snaketools.pipeline_manager.PipelineManagerDirectory("test", cfg)
198205

199206

200-
201207
def test_pipeline_others():
202208
cfg = SequanaConfig({})
203209
file1 = os.path.join(test_dir, "data", "Hm2_GTGAAA_L005_R1_001.fastq.gz")

tests/snaketools/test_slurm.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from pathlib import Path
2+
from unittest.mock import MagicMock, patch
23

34
import pytest
45

5-
from sequana_pipetools.snaketools.slurm import SlurmParsing
6+
from sequana_pipetools.snaketools.slurm import SlurmParsing, SlurmStats
67

78
from . import test_dir
89

@@ -19,3 +20,41 @@ def test_slurm():
1920
print(dj)
2021
dj
2122
dj._report()
23+
24+
25+
@pytest.fixture
26+
def mock_sacct_output():
27+
# This is the mock output for the sacct command, including the header
28+
return """
29+
MaxRSS AllocCPUS Elapsed CPUTime
30+
---------- ---------- ---------- ----------
31+
1 00:00:24 00:00:24
32+
264364K 1 00:00:24 00:00:24
33+
36K 1 00:00:24 00:00:24
34+
"""
35+
36+
37+
def test_parse_sacct_output(mock_sacct_output):
38+
# Initialize SlurmStats object with dummy parameters
39+
slurm_stats = SlurmStats(working_directory=".", logs_directory="logs")
40+
41+
# Test the _parse_sacct_output method
42+
job_info = slurm_stats._parse_sacct_output(mock_sacct_output)
43+
44+
# Assert the parsing is correct
45+
assert job_info == [0.252117, 1, "00:00:24", "00:00:24"]
46+
47+
48+
@patch("subprocess.run")
49+
def test_slurm_stats_with_mocked_sacct(mock_run, mock_sacct_output, tmpdir):
50+
# Mock the subprocess.run method
51+
mock_run.return_value = MagicMock(stdout=mock_sacct_output.encode("utf-8"), returncode=0)
52+
53+
# Initialize SlurmStats object with dummy parameters
54+
slurm_stats = SlurmStats(working_directory=sharedir / "slurm_error1")
55+
56+
# Check if the result has been processed correctly
57+
assert len(slurm_stats.results) == 9
58+
assert slurm_stats.results[0]
59+
60+
slurm_stats.to_csv(str(tmpdir.join("test.csv")))

0 commit comments

Comments
 (0)