Source code for pull_into_place.big_jobs

#!/usr/bin/env python2

import sys, os, re, json, subprocess
from . import pipeline

[docs]def submit(script, workspace, **params): """Submit a job with the given parameters.""" from klab import cluster, process # Make sure the rosetta symlink has been created. if not os.path.exists(workspace.rosetta_dir): raise pipeline.RosettaNotFound(workspace) # Parse some job parameters for the keyword arguments. params = dict((k, v) for k, v in params.items() if v is not None) test_run = params.get('test_run', False) nstruct = params.get('nstruct') max_runtime = params.get('max_runtime', '6:00:00') max_memory = params.get('max_memory', '1G') if test_run: nstruct = 50 max_runtime = '0:30:00' if nstruct is None: raise TypeError("sumbit() requires the keyword argument 'nstruct' for production runs.") # Submit the job and put it immediately into the hold state. qsub_command = 'qsub', '-h', '-cwd' qsub_command += '-o', workspace.stdout_dir qsub_command += '-e', workspace.stderr_dir qsub_command += '-t', '1-{0}'.format(nstruct), qsub_command += '-l', 'h_rt={0}'.format(max_runtime), qsub_command += '-l', 'mem_free={0}'.format(max_memory), qsub_command += pipeline.big_job_path(script), qsub_command += workspace.focus_dir, status = process.check_output(qsub_command) status_pattern = re.compile(r'Your job-array (\d+).[0-9:-]+ \(".*"\) has been submitted') status_match = status_pattern.match(status) if not status_match: print status sys.exit() # Figure out the job id, then make a params file specifically for it. job_id = status_match.group(1) with open(workspace.job_params_path(job_id), 'w') as file: json.dump(params, file) # Release the hold on the job. qrls_command = 'qrls', job_id process.check_output(qrls_command) print status,
[docs]def initiate(): """Return some relevant information about the currently running job.""" workspace = pipeline.workspace_from_dir(sys.argv[1]) workspace.cd_to_root() job_id = int(os.environ['JOB_ID']) task_id = int(os.environ['SGE_TASK_ID']) - 1 job_params = read_params(workspace.job_params_path(job_id)) return workspace, job_id, task_id, job_params
def read_params(params_path): with open(params_path) as file: return json.load(file) def print_debug_info(): from datetime import datetime from socket import gethostname print "Date:", datetime.now() print "Host:", gethostname() print "Command: JOB_ID={0[JOB_ID]} SGE_TASK_ID={0[SGE_TASK_ID]} {1}".format( os.environ, ' '.join(sys.argv)) print sys.stdout.flush() def run_command(command): print "Working directory:", os.getcwd() print "Command:", ' '.join(command) sys.stdout.flush() process = subprocess.Popen(command) print "Process ID:", process.pid print sys.stdout.flush() process.wait()