Commit 2daf2061 authored by Valentin Pelloin's avatar Valentin Pelloin
Browse files

second version of slurm execute seems to work

parent b6181c60
#!/usr/bin/env python3
import os
import re
import sys
import stat
import time
import signal
import argparse
import subprocess
from multiprocessing import Process
def get_default_jobname():
return os.popen('ps -o comm= $PPID').read().strip()
def get_jobid(command_file):
return int(re.match("\.slurm_batch_script_job_(\d+).sh", command_file).group(1))
def get_job_status(jobid):
return os.popen('sacct --noheader --parsable2 -j {} | head -n 1 | cut -d "|" -f 6'.format(jobid)).read().strip()
def add_line_file(file, line_i, line_content):
with open(file, "r") as f:
contents = f.readlines()
contents.insert(line_i, line_content + "\n")
with open(file, "w") as f:
contents = "".join(contents)
f.write(contents)
def print_box(string):
l = len(string) + 4
print("=" * l)
print("=", string, "=")
print("=" * l)
parser = argparse.ArgumentParser(
description='Execute a bunch of commands on SLURM.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument('--mem', type=str, default="5G", help="Memory")
parser.add_argument('--p', type=str, default="cpu", help="Partition")
parser.add_argument('--w', type=str, default=None, help="Node(s)")
parser.add_argument('--c', type=int, default=1, help="Number of cpus per tasks")
parser.add_argument('--N', type=int, default=1, help="Minimum allocated nodes")
parser.add_argument('--gres', type=str, default=None, help="List of generic consumable resources")
parser.add_argument('--time', type=str, default="01-00", help="Time limit")
parser.add_argument('--ranks', type=str, default="", help="Exporting rank")
parser.add_argument('--job-name', type=str, default=get_default_jobname(), help="Job name")
parser.add_argument('--max-parallel', type=int, default=10, help="In case of parallel jobs, max jobs to run simultaneously")
parser.add_argument('--repeat-worker', type=int, default=1, help="Repeat command if only one command file given")
parser.add_argument(
'command_files',
type=str,
nargs='+',
help='List of command files to execute on SLURM'
)
args = parser.parse_args()
n_command_files = len(args.command_files)
if args.repeat_worker > 1:
assert len(args.command_files) == 1
global jobs_to_kill
global files_to_print
global files_to_rm
global exit_code
jobs_to_kill = []
files_to_print = []
files_to_rm = []
exit_code = None
def cleanup():
print(files_to_rm)
for j in jobs_to_kill:
os.popen('scancel {}'.format(j))
for i, file in enumerate(files_to_print, start=1):
print("")
print_box("BEGIN {}: Output file {}".format(i, file))
try:
with open(file, 'r') as f:
print(f.read(), end='')
except FileNotFoundError:
pass
print_box("END {}: Output file {}".format(i, file))
print("")
for file in files_to_rm:
try:
os.remove(file)
except FileNotFoundError:
print("Info: cannot remove {} as it does not exists.".format(file))
def exec(command, options, fixed_options):
line = [command]
line.extend([
'--mem', args.mem,
'--time', args.time,
'--job-name', args.job_name,
'-p', str(args.p),
'-c', str(args.c),
'-N', str(args.N)
])
line.extend(options)
line.append(fixed_options)
p = subprocess.Popen(line, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for command_file in args.command_files:
files_to_rm.append(command_file)
if command == 'srun':
print_box("BEGIN of job")
for line in p.stdout:
print(line.decode('utf-8'), end='')
print_box("END of job")
elif command == 'sbatch':
jobid = int(p.stdout.readline().strip())
jobs_to_kill.append(jobid)
if args.repeat_worker == 1:
for command_file in args.command_files:
output_f = 'slurm-{}_{}.out'.format(jobid, get_jobid(command_file))
files_to_print.append(output_f)
files_to_rm.append(output_f)
else:
for i in range(1, 1 + args.repeat_worker):
output_f = 'slurm-{}_{}.out'.format(jobid, i)
files_to_print.append(output_f)
files_to_rm.append(output_f)
print("Using SLURM job id {}.".format(jobid))
global exit_code
exit_code = p.wait()
cleanup()
def receiveSignal(signal_number, frame):
cleanup()
exit(signal_number)
def register_signals():
signal.signal(signal.SIGHUP, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
signal.signal(signal.SIGQUIT, receiveSignal)
signal.signal(signal.SIGILL, receiveSignal)
signal.signal(signal.SIGTRAP, receiveSignal)
signal.signal(signal.SIGABRT, receiveSignal)
signal.signal(signal.SIGBUS, receiveSignal)
signal.signal(signal.SIGFPE, receiveSignal)
signal.signal(signal.SIGUSR1, receiveSignal)
signal.signal(signal.SIGSEGV, receiveSignal)
signal.signal(signal.SIGUSR2, receiveSignal)
signal.signal(signal.SIGPIPE, receiveSignal)
signal.signal(signal.SIGALRM, receiveSignal)
signal.signal(signal.SIGTERM, receiveSignal)
register_signals()
if n_command_files > 1:
job_ids = [str(get_jobid(command_file)) for command_file in args.command_files]
print("Starting {} jobs on SLURM..".format(n_command_files))
exec("sbatch", [
"--wait",
"--parsable",
"--array", "{}%{}".format(",".join(job_ids), args.max_parallel)
], "slurm_array_job_start")
elif args.repeat_worker > 1:
print("Starting {} times the same worker on SLURM...".format(args.repeat_worker))
exec("sbatch", [
"--wait",
"--parsable",
"--array", "1-{}%{}".format(args.repeat_worker, args.max_parallel)
], args.command_files[0])
elif n_command_files == 1:
print("Starting 1 job on SLURM...")
os.chmod(args.command_files[0], 0o755)
for i, rank in enumerate(args.ranks.split()):
add_line_file(args.command_files[0], 3, "if test $SLURM_NODEID = {} ; then export RANK={}; fi".format(i, rank))
add_line_file(args.command_files[0], 3, "export MASTER_HOST=$(get_master)")
exec("srun", ["--quit-on-interrupt"], args.command_files[0])
exit(exit_code)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment