Skip to content

Let gridmap support using num_proc as resource complex values #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions gridmap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

:var USE_MEM_FREE: Does your cluster support specifying how much memory a job
will use via mem_free? (Default: ``False``)
:var USE_NUM_PROC: Does your cluster support specifying how many procs a job
will use via num_proc? (Default: ``False``)
:var DEFAULT_QUEUE: The default job scheduling queue to use.
(Default: ``all.q``)
:var DEFAULT_TEMP_DIR: The default temporary directory for job output.
Expand Down Expand Up @@ -73,7 +75,7 @@
HEARTBEAT_FREQUENCY, IDLE_THRESHOLD,
MAX_IDLE_HEARTBEATS, MAX_TIME_BETWEEN_HEARTBEATS,
NUM_RESUBMITS, SEND_ERROR_MAIL, SMTP_SERVER,
USE_MEM_FREE, DEFAULT_TEMP_DIR)
USE_MEM_FREE, USE_NUM_PROC, DEFAULT_TEMP_DIR)
from gridmap.job import (Job, JobException, process_jobs, grid_map,
DRMAANotPresentException)
from gridmap.version import __version__, VERSION
Expand All @@ -85,4 +87,4 @@
'ERROR_MAIL_SENDER', 'HEARTBEAT_FREQUENCY', 'IDLE_THRESHOLD',
'MAX_IDLE_HEARTBEATS', 'MAX_TIME_BETWEEN_HEARTBEATS',
'NUM_RESUBMITS', 'SEND_ERROR_MAIL', 'SMTP_SERVER', 'USE_MEM_FREE',
'DEFAULT_TEMP_DIR']
'USE_NUM_PROC', 'DEFAULT_TEMP_DIR']
5 changes: 5 additions & 0 deletions gridmap/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

:var USE_MEM_FREE: Does your cluster support specifying how much memory a job
will use via mem_free? (Default: ``False``)
:var USE_NUM_PROC: Does your cluster support specifying how many procs a job
will use via num_proc? (Default: ``False``)
:var DEFAULT_QUEUE: The default job scheduling queue to use.
(Default: ``all.q``)
:var CREATE_PLOTS: Should we plot cpu and mem usage and send via email?
Expand Down Expand Up @@ -125,6 +127,9 @@
# Is mem_free configured properly on the cluster?
USE_MEM_FREE = 'TRUE' == os.getenv('USE_MEM_FREE', 'False').upper()

# Is num_proc configured properly on the cluster?
USE_NUM_PROC = 'TRUE' == os.getenv('USE_NUM_PROC', 'False').upper()

# Which queue should we use by default
DEFAULT_QUEUE = os.getenv('DEFAULT_QUEUE', 'all.q')

Expand Down
17 changes: 12 additions & 5 deletions gridmap/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
IDLE_THRESHOLD, MAX_IDLE_HEARTBEATS,
MAX_TIME_BETWEEN_HEARTBEATS, NUM_RESUBMITS,
SEND_ERROR_MAIL, SMTP_SERVER, USE_MEM_FREE,
DEFAULT_TEMP_DIR)
USE_NUM_PROC, DEFAULT_TEMP_DIR)
from gridmap.data import zdumps, zloads
from gridmap.runner import _heart_beat

Expand Down Expand Up @@ -111,14 +111,14 @@ class Job(object):
"""

__slots__ = ('_f', 'args', 'id', 'kwlist', 'cleanup', 'ret', 'traceback',
'num_slots', 'mem_free', 'white_list', 'path', 'uniq_id',
'num_slots', 'num_proc', 'mem_free', 'white_list', 'path', 'uniq_id',
'name', 'queue', 'environment', 'working_dir',
'cause_of_death', 'num_resubmits', 'home_address',
'log_stderr_fn', 'log_stdout_fn', 'timestamp', 'host_name',
'heart_beat', 'track_mem', 'track_cpu', 'interpreting_shell',
'copy_env')

def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G",
def __init__(self, f, args, kwlist=None, cleanup=True, num_proc=1, mem_free="1G",
name='gridmap_job', num_slots=1, queue=DEFAULT_QUEUE,
interpreting_shell=None, copy_env=True, add_env=None):
"""
Expand All @@ -132,6 +132,8 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G",
:type kwlist: dict
:param cleanup: flag that determines the cleanup of input and log file
:type cleanup: boolean
:param num_proc: Estimate of how many procs this job will need (for scheduling)
:type num_proc: int
:param mem_free: Estimate of how much memory this job will need (for
scheduling)
:type mem_free: str
Expand Down Expand Up @@ -170,6 +172,7 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G",
self.cleanup = cleanup
self.ret = _JOB_NOT_FINISHED
self.num_slots = num_slots
self.num_proc = num_proc
self.mem_free = mem_free
self.white_list = []
self.name = name.replace(' ', '_')
Expand Down Expand Up @@ -260,6 +263,8 @@ def native_specification(self):
ret += " -S {}".format(self.interpreting_shell)
ret += " -b yes"

if self.num_proc and USE_NUM_PROC:
ret += " -l num_proc={}".format(self.num_proc)
if self.mem_free and USE_MEM_FREE:
ret += " -l mem_free={}".format(self.mem_free)
if self.num_slots and self.num_slots > 1:
Expand Down Expand Up @@ -925,7 +930,7 @@ def _resubmit(session_id, job, temp_dir):
#####################
# MapReduce Interface
#####################
def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
def grid_map(f, args_list, cleanup=True, num_proc=1, mem_free="1G", name='gridmap_job',
num_slots=1, temp_dir=DEFAULT_TEMP_DIR, white_list=None,
queue=DEFAULT_QUEUE, quiet=True, local=False, max_processes=1,
interpreting_shell=None, copy_env=True, add_env=None,
Expand All @@ -946,6 +951,8 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
each job when we're done? (They are left in place if there's
an error.)
:type cleanup: bool
:param num_proc: Estimate of how many procs this job will need (for scheduling)
:type num_proc: int
:param mem_free: Estimate of how much memory each job will need (for
scheduling). (Not currently used, because our cluster does
not have that setting enabled.)
Expand Down Expand Up @@ -990,7 +997,7 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',

# construct jobs
jobs = [Job(f, [args] if not isinstance(args, list) else args,
cleanup=cleanup, mem_free=mem_free,
cleanup=cleanup, num_proc=num_proc, mem_free=mem_free,
name='{}{}'.format(name, job_num), num_slots=num_slots,
queue=queue, interpreting_shell=interpreting_shell,
copy_env=copy_env, add_env=add_env)
Expand Down