50
50
from email .mime .image import MIMEImage
51
51
from io import open
52
52
from importlib import import_module
53
- from multiprocessing import Pool
53
+ from multiprocessing import Pool , Value
54
54
from socket import gethostname , gethostbyname , getaddrinfo , getfqdn
55
55
from smtplib import (SMTPRecipientsRefused , SMTPHeloError , SMTPSenderRefused ,
56
56
SMTPDataError )
@@ -730,15 +730,23 @@ def _execute(job):
730
730
job .execute ()
731
731
return job .ret
732
732
733
+ def _init_pool_processes (the_val ):
734
+ '''Initialize each process with a global shared variable.
735
+ '''
736
+ global shared_val
737
+ shared_val = the_val
738
+
733
739
734
- def _process_jobs_locally (jobs , max_processes = 1 ):
740
+ def _process_jobs_locally (jobs , max_processes = 1 , shared_val = None ):
735
741
"""
736
742
Local execution using the package multiprocessing, if present
737
743
738
744
:param jobs: jobs to be executed
739
745
:type jobs: list of Job
740
746
:param max_processes: maximal number of processes
741
747
:type max_processes: int
748
+ :param shared_val: shared value for the jobs
749
+ :type shared_val: multiprocessing.Value, optional
742
750
743
751
:return: list of jobs, each with return in job.ret
744
752
:rtype: list of Job
@@ -751,7 +759,7 @@ def _process_jobs_locally(jobs, max_processes=1):
751
759
for job in jobs :
752
760
job .execute ()
753
761
else :
754
- pool = Pool (max_processes )
762
+ pool = Pool (processes = max_processes , initializer = _init_pool_processes , initargs = ( shared_val ,) )
755
763
result = pool .map (_execute , jobs )
756
764
for ret_val , job in zip (result , jobs ):
757
765
job .ret = ret_val
@@ -856,7 +864,7 @@ def _append_job_to_session(session, job, temp_dir=DEFAULT_TEMP_DIR, quiet=True):
856
864
857
865
858
866
def process_jobs (jobs , temp_dir = DEFAULT_TEMP_DIR , white_list = None , quiet = True ,
859
- max_processes = 1 , local = False , require_cluster = False ):
867
+ max_processes = 1 , local = False , require_cluster = False , shared_val = None ):
860
868
"""
861
869
Take a list of jobs and process them on the cluster.
862
870
@@ -879,6 +887,8 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True,
879
887
:param require_cluster: Should we raise an exception if access to cluster
880
888
is not available?
881
889
:type require_cluster: bool
890
+ :param shared_val: A shared value for all of jobs
891
+ :type shared_val: multiprocessing.Value, optional
882
892
883
893
:returns: List of Job results
884
894
"""
@@ -904,7 +914,7 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True,
904
914
# handling of inputs, outputs and heartbeats
905
915
monitor .check (sid , jobs )
906
916
else :
907
- _process_jobs_locally (jobs , max_processes = max_processes )
917
+ _process_jobs_locally (jobs , max_processes = max_processes , shared_val = shared_val )
908
918
909
919
return [job .ret for job in jobs ]
910
920
@@ -943,7 +953,7 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
943
953
interpreting_shell = None , copy_env = True , add_env = None , project = None ,
944
954
validation_level = None , os_distribution = None , os_minor = None , gpu = 0 ,
945
955
h_vmem = None , h_rt = None , resources = None , completion_mail = False ,
946
- require_cluster = False , par_env = DEFAULT_PAR_ENV ):
956
+ require_cluster = False , par_env = DEFAULT_PAR_ENV , shared_val = None ):
947
957
"""
948
958
Maps a function onto the cluster.
949
959
@@ -1016,6 +1026,8 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
1016
1026
:type os_distribution: str
1017
1027
:param os_minor: os minor version that need job to run on machine
1018
1028
:type os_minor: str
1029
+ :param shared_val: A shared value for all the jobs
1030
+ :type shared_val: multiprocessing.Value, optional
1019
1031
1020
1032
:returns: List of Job results
1021
1033
"""
@@ -1036,7 +1048,8 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
1036
1048
white_list = white_list ,
1037
1049
quiet = quiet , local = local ,
1038
1050
max_processes = max_processes ,
1039
- require_cluster = require_cluster )
1051
+ require_cluster = require_cluster ,
1052
+ shared_val = shared_val )
1040
1053
1041
1054
# send a completion mail (if requested and configured)
1042
1055
if completion_mail and SEND_ERROR_MAIL :
0 commit comments