Dear Developers,
I have been working on a piece of code development that need your sincere help 
(three code file attached here).

I use run_slurm.py to use python subprocess module to submit multiple jobs to 
slurm cluster by invoking a sbatch file with a for loop to reach the following 
target:

1) create some environmental variables for job_slurm.py to run simulations 
(values of these environmental variables will change for each job with the for 
loop)

2) invoke submit_slurm.sh to submit a sbatch job that will run job_slurm.py

3) each job_slurm.py will use python multiprocess.Pool to run parallized 
simulations on each ./mod_bart.sh exeutable file (will run for a few hours) on 
a single compute node in cluster using all cores of this compute node

I get the following error all the time, could you provide some insights on how 
our implementation is wrong to achieve the desired goal:


Exception in thread Thread-3:
Traceback (most recent call last):
  File "/share/apps/python/2.7/lib/python2.7/threading.py", line 552, in 
__bootstrap_inner
    self.run()
  File "/share/apps/python/2.7/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/share/apps/python/2.7/lib/python2.7/multiprocessing/pool.py", line 
347, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 
'subprocess.CalledProcessError'>, ())


#!/usr/bin/env python
"""
this module runs ART simulations in slurm cluster environment,
the python multiprocessing module only works on single node so that
we will submit the job to compute node with at least 64 cores
"""
import subprocess
import os
import shutil
def run_art_cluster_slurm(path_to_data_dir, input_param):
	"""
	this function will invoke running artn simulations in cluster set up by 
	submitting a job to one compute node, which performs a parallelized
	art running using multiprocessing module with its maximum cores.
	
	Note:
		the key "num_of_proc" in input setting file will be interpreted as 
	the number of compute nodes when in this cluster mode
	"""
	#path_to_input_files = input_param['path_to_input_files']
	num_of_proc = input_param["num_of_proc"]
	central_atom_list = input_param['central_atom_list']
	
	# copy these two files into another dir in path_to_data_dir
	path_to_submit_slurm = os.path.join(os.environ["MY_ART"], "src/python/ART_wrapper/submit_slurm.sh")
	path_to_job_slurm = os.path.join(os.environ["MY_ART"], "src/python/ART_wrapper/job_slurm.py")
	path_to_slurm_output = os.path.join(path_to_data_dir, "slurm")
	if not os.path.isdir(path_to_slurm_output):
		os.makedirs(path_to_slurm_output)
	
	for full_file_name in [path_to_submit_slurm, path_to_job_slurm]:
		if os.path.isfile(full_file_name):
			shutil.copy(full_file_name,path_to_slurm_output)
	
	path_to_sh_file = os.path.join(path_to_slurm_output,"submit_slurm.sh")
	path_to_job_file = os.path.join(path_to_slurm_output,"job_slurm.py")
	# split the central_atom_list into the num_of_proc folds
	all_tests_folds = list(split(central_atom_list, num_of_proc))
	for test_fold in all_tests_folds:
		print "current job central atoms list:", test_fold
		test_fold = str(test_fold).replace(",", "")
		test_fold = "'%s'"%test_fold		
		subprocess.check_call('export ATOM_LIST=%s;export PATH_TO_JOB=%s;sbatch %s'%(test_fold,path_to_job_file,path_to_sh_file), stdout=subprocess.PIPE, stderr=subprocess.STDOUT,shell=True)
		#run_bart_node(list_of_run_dir)
	print "done submitting jobs of running ART simulations using %s of compute nodes!"%num_of_proc
	
def split(a, n):
    k, m = divmod(len(a), n)
    return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in xrange(n))
#!/bin/sh
#SBATCH -c 32
#SBATCH -N 1
#SBATCH -p main
#SBATCH --qos main
#SBATCH --mem-per-cpu 6G
srun --mpi=pmi2 python $PATH_TO_JOB
import os
import subprocess
import time
import multiprocessing as mp

def run_bart_node(path_to_data_dir, list_of_central_atoms):
	"""
	this function will be used to submit a job to a compute node. This job will
	invoke the parallized art running using all cores of this compute node
	"""
	
	list_of_run_dir = []
	for central_atom in list_of_central_atoms:
		path_to_run_dir = os.path.join(path_to_data_dir, str(central_atom))
		list_of_run_dir.append(path_to_run_dir)
	pool = mp.Pool(processes=mp.cpu_count())
	pool.map(run_bart_core,list_of_run_dir)
        pool.close()
        pool.join()
def run_bart_core(path_to_run_dir):
	os.chdir(path_to_run_dir)
	subprocess.check_call("chmod +x mod_bart.sh; ./mod_bart.sh", shell=True)
	print "done runing art for %s"%path_to_run_dir

central_atoms = os.environ["ATOM_LIST"][1:-1].split(" ")
list_of_central_atoms = map(int, central_atoms)
path_to_data_dir = os.environ["DATA_DIR"]

start_time = time.time()
run_bart_node(path_to_data_dir, list_of_central_atoms)
print "total time is", time.time() - start_time
_______________________________________________
Python-Dev mailing list
Python-Dev@python.org
https://mail.python.org/mailman/listinfo/python-dev
Unsubscribe: 
https://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com

Reply via email to