Dear Developers, I have been working on a piece of code development that need your sincere help (three code file attached here).
I use run_slurm.py to use python subprocess module to submit multiple jobs to slurm cluster by invoking a sbatch file with a for loop to reach the following target: 1) create some environmental variables for job_slurm.py to run simulations (values of these environmental variables will change for each job with the for loop) 2) invoke submit_slurm.sh to submit a sbatch job that will run job_slurm.py 3) each job_slurm.py will use python multiprocess.Pool to run parallized simulations on each ./mod_bart.sh exeutable file (will run for a few hours) on a single compute node in cluster using all cores of this compute node I get the following error all the time, could you provide some insights on how our implementation is wrong to achieve the desired goal: Exception in thread Thread-3: Traceback (most recent call last): File "/share/apps/python/2.7/lib/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/share/apps/python/2.7/lib/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/share/apps/python/2.7/lib/python2.7/multiprocessing/pool.py", line 347, in _handle_results task = get() TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>, ())
#!/usr/bin/env python """ this module runs ART simulations in slurm cluster environment, the python multiprocessing module only works on single node so that we will submit the job to compute node with at least 64 cores """ import subprocess import os import shutil def run_art_cluster_slurm(path_to_data_dir, input_param): """ this function will invoke running artn simulations in cluster set up by submitting a job to one compute node, which performs a parallelized art running using multiprocessing module with its maximum cores. Note: the key "num_of_proc" in input setting file will be interpreted as the number of compute nodes when in this cluster mode """ #path_to_input_files = input_param['path_to_input_files'] num_of_proc = input_param["num_of_proc"] central_atom_list = input_param['central_atom_list'] # copy these two files into another dir in path_to_data_dir path_to_submit_slurm = os.path.join(os.environ["MY_ART"], "src/python/ART_wrapper/submit_slurm.sh") path_to_job_slurm = os.path.join(os.environ["MY_ART"], "src/python/ART_wrapper/job_slurm.py") path_to_slurm_output = os.path.join(path_to_data_dir, "slurm") if not os.path.isdir(path_to_slurm_output): os.makedirs(path_to_slurm_output) for full_file_name in [path_to_submit_slurm, path_to_job_slurm]: if os.path.isfile(full_file_name): shutil.copy(full_file_name,path_to_slurm_output) path_to_sh_file = os.path.join(path_to_slurm_output,"submit_slurm.sh") path_to_job_file = os.path.join(path_to_slurm_output,"job_slurm.py") # split the central_atom_list into the num_of_proc folds all_tests_folds = list(split(central_atom_list, num_of_proc)) for test_fold in all_tests_folds: print "current job central atoms list:", test_fold test_fold = str(test_fold).replace(",", "") test_fold = "'%s'"%test_fold subprocess.check_call('export ATOM_LIST=%s;export PATH_TO_JOB=%s;sbatch %s'%(test_fold,path_to_job_file,path_to_sh_file), stdout=subprocess.PIPE, stderr=subprocess.STDOUT,shell=True) #run_bart_node(list_of_run_dir) print "done submitting jobs of running ART simulations using %s of compute nodes!"%num_of_proc def split(a, n): k, m = divmod(len(a), n) return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in xrange(n))
#!/bin/sh #SBATCH -c 32 #SBATCH -N 1 #SBATCH -p main #SBATCH --qos main #SBATCH --mem-per-cpu 6G srun --mpi=pmi2 python $PATH_TO_JOB
import os import subprocess import time import multiprocessing as mp def run_bart_node(path_to_data_dir, list_of_central_atoms): """ this function will be used to submit a job to a compute node. This job will invoke the parallized art running using all cores of this compute node """ list_of_run_dir = [] for central_atom in list_of_central_atoms: path_to_run_dir = os.path.join(path_to_data_dir, str(central_atom)) list_of_run_dir.append(path_to_run_dir) pool = mp.Pool(processes=mp.cpu_count()) pool.map(run_bart_core,list_of_run_dir) pool.close() pool.join() def run_bart_core(path_to_run_dir): os.chdir(path_to_run_dir) subprocess.check_call("chmod +x mod_bart.sh; ./mod_bart.sh", shell=True) print "done runing art for %s"%path_to_run_dir central_atoms = os.environ["ATOM_LIST"][1:-1].split(" ") list_of_central_atoms = map(int, central_atoms) path_to_data_dir = os.environ["DATA_DIR"] start_time = time.time() run_bart_node(path_to_data_dir, list_of_central_atoms) print "total time is", time.time() - start_time
_______________________________________________ Python-Dev mailing list Python-Dev@python.org https://mail.python.org/mailman/listinfo/python-dev Unsubscribe: https://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com