Source code for pull_into_place.pipeline

#!/usr/bin/env python2

"""\
This module defines the Workspace classes that are central to every script.  
The role of these classes is to provide paths to all the data files used in any 
part of the pipeline and to hide the organization of the directories containing 
those files.  The base Workspace class deals with files in the root directory 
of a design.  It's subclasses deal with file in the different subdirectories of 
the design, each of which is related to a cluster job.
"""

import os, re, glob, json, pickle
from klab import scripting
from pprint import pprint

[docs]class Workspace (object): """ Provide paths to every file used in the design pipeline. Each workspace object is responsible for returning paths to files that are relevant to a particular stage of the design pipeline. These files are organized hierarchically: files that are relevant to many parts of the pipeline are stored in the root design directory while files that are relevant to specific stages are stored in subdirectories. You can think of each workspace class as representing a different directory. The Workspace class itself represents the root directory, but it is also the superclass from which all of the other workspace derive. The reason for this is that the root workspace knows where all the shared parameter files are located, and this information is needed in every workspace. When modifying or inheriting from this class, keep in mind two things. First, workspace objects should do little more than return paths to files. There are a few convenience functions that clear directories and things like that, but these are the exception rather than the rule. Second, use the @property decorator very liberally to keep the code that uses this API succinct and easy to read. """ def __init__(self, root): root = os.path.normpath(root) self._root_basename = os.path.basename(root) self._root_dirname = os.path.dirname(root) @classmethod def from_directory(cls, directory): # Force subclasses to reimplement this method if cls != Workspace: raise NotImplementedError return Workspace(directory) @property def parent_dir(self): return self._root_dirname @property def root_dir(self): return os.path.normpath( os.path.join(self._root_dirname, self._root_basename)) @property def abs_root_dir(self): return os.path.abspath(self.root_dir) @property def focus_dir(self): """ The particular directory managed by this class. This is meant to be overridden in subclasses. """ return self.root_dir @property def io_dirs(self): return [] @property def rosetta_dir(self): return self.find_path('rosetta') @property def rosetta_scripts_path(self): pattern = self.rosetta_subpath('source', 'bin', 'rosetta_scripts*') executables = glob.glob(pattern) # Sometimes dead symlinks end up in the `bin/` directory, so explicitly # ignore those. executables = [x for x in executables if os.path.exists(x)] # Print a (hopefully) helpful error message if no ``rosetta_scripts`` # executables are found. if len(executables) == 0: raise PipelineError("""\ No RosettaScripts executable found. Expected to find a file matching '{0}'. Did you forget to compile rosetta? """.format(pattern)) # Sort the ``rosetta_scripts`` executables such that those containing # the word 'release' end up at the front of the list, those containing # 'debug' end up at the back, and shorter file names (which have fewer # weird compilation options) end up in front of longer ones. We'll # ultimately pick the first path in the list, so we're doing our best # to use a basic release mode executable. executables.sort(key=lambda x: len(x)) executables.sort(key=lambda x: 'debug' in x) executables.sort(key=lambda x: 'release' not in x) return executables[0] @property def rosetta_database_path(self): return self.rosetta_subpath('database') def rosetta_subpath(self, *subpaths): return os.path.join(self.rosetta_dir, *subpaths) @property def input_pdb_path(self): return self.find_path('input.pdb.gz') @property def loops_path(self): return self.find_path('loops') @property def resfile_path(self): return self.find_path('resfile') @property def restraints_path(self): return self.find_path('restraints') @property def scorefxn_path(self): return self.find_path('scorefxn.wts') @property def build_script_path(self): return self.find_path('build_models.xml') @property def design_script_path(self): return self.find_path('design_models.xml') @property def validate_script_path(self): return self.find_path('validate_designs.xml') @property def shared_defs_path(self): return self.find_path('shared_defs.xml') @property def flags_path(self): return self.find_path('flags') @property def rsync_url_path(self): return self.find_path('rsync_url') @property def rsync_url(self): if not os.path.exists(self.rsync_url_path): raise UnspecifiedRemoteHost() with open(self.rsync_url_path) as file: return file.read().strip() @property def rsync_recursive_flag(self): return False @property def rsync_include_patterns(self): return [] @property def rsync_exclude_patterns(self): return ['rosetta', 'rsync_url']
[docs] def find_path(self, basename): """ Look in a few places for a file with the given name. If a custom version of the file is found in the directory being managed by this workspace, return it. Otherwise return the path to the default version of the file in the root directory. This function makes it easy to provide custom parameters to any stage to the design pipeline. Just place the file with the custom parameters in the directory associated with that stage. """ custom_path = os.path.join(self.focus_dir, basename) default_path = os.path.join(self.root_dir, basename) return custom_path if os.path.exists(custom_path) else default_path
def check_paths(self): required_paths = [ self.input_pdb_path, self.loops_path, self.resfile_path, self.restraints_path, self.build_script_path, self.design_script_path, self.validate_script_path, self.flags_path, ] for path in required_paths: if not os.path.exists(path): raise PathNotFound(path) def check_rosetta(self): required_paths = [ self.rosetta_database_path, self.rosetta_scripts_path, ] for path in required_paths: if not os.path.exists(path): raise PathNotFound(path) @property def incompatible_with_fragments_script(self): return re.search('[^a-zA-Z0-9_/.]', self.abs_root_dir) def make_dirs(self): scripting.mkdir(self.focus_dir) pickle_path = os.path.join(self.focus_dir, 'workspace.pkl') with open(pickle_path, 'w') as file: pickle.dump(self.__class__, file)
[docs] def cd(self, *subpaths): """ Change the current working directory and update all the paths in the workspace. This is useful for commands that have to be run from a certain directory. """ source = os.path.abspath(self._root_dirname) target = os.path.abspath(os.path.join(*subpaths)) self._root_dirname = os.path.relpath(source, target) os.chdir(target)
def cd_to_root(self): self.cd(self.root_dir) def exists(self): return os.path.exists(self.focus_dir)
[docs]class BigJobWorkspace (Workspace): """ Provide paths needed to run big jobs on the cluster. This is a base class for all the workspaces meant to store results from long simulations (which is presently all of them except for the root). This class provides paths to input directories, output directories, parameters files, and several other things like that. """ @property def input_dir(self): return os.path.join(self.focus_dir, 'inputs') @property def input_paths(self): return glob.glob(os.path.join(self.input_dir, '*.pdb.gz')) def input_path(self, name): return os.path.join(self.input_dir, name) @property def input_names(self): return [os.path.basename(x) for x in self.input_paths] @property def output_dir(self): return os.path.join(self.focus_dir, 'outputs') @property def output_subdirs(self): return [self.output_dir] @property def output_paths(self): return glob.glob(os.path.join(self.input_dir, '*.pdb.gz')) @property def io_dirs(self): return [self.input_dir] + self.output_subdirs @property def stdout_dir(self): return os.path.join(self.focus_dir, 'stdout') @property def stderr_dir(self): return os.path.join(self.focus_dir, 'stderr') @property def rsync_recursive_flag(self): return True @property def rsync_exclude_patterns(self): parent_patterns = super(BigJobWorkspace, self).rsync_exclude_patterns return parent_patterns + ['stderr/', 'stdout/', '*.sc'] def job_params_path(self, job_id): return os.path.join(self.focus_dir, '{0}.json'.format(job_id)) @property def all_job_params_paths(self): return glob.glob(os.path.join(self.focus_dir, '*.json')) @property def all_job_params(self): from . import big_jobs return [big_jobs.read_params(x) for x in self.all_job_params_paths] @property def unclaimed_inputs(self): inputs = set(self.input_names) for params in self.all_job_params: inputs -= set(params['inputs']) return sorted(inputs) def make_dirs(self): Workspace.make_dirs(self) scripting.mkdir(self.input_dir) scripting.mkdir(self.output_dir) scripting.mkdir(self.stdout_dir) scripting.mkdir(self.stderr_dir) def clear_inputs(self): scripting.clear_directory(self.input_dir) def clear_outputs(self): scripting.clear_directory(self.output_dir) scripting.clear_directory(self.stdout_dir) scripting.clear_directory(self.stderr_dir) for path in self.all_job_params_paths: os.remove(path)
[docs]class WithFragmentLibs (object): """ Provide paths needed to interact with fragment libraries. This is a mixin class that provides a handful of paths and features useful for working with fragment libraries. """ @property def fasta_path(self): return os.path.join(self.fragments_dir, 'input.fasta') @property def fragments_dir(self): return os.path.join(self.focus_dir, 'fragments') def fragments_tag(self, input_path): return os.path.basename(input_path)[:4] def fragments_info(self, input_path): # Typically, there is one output directory for each chain that # fragments are being generated for. tag = self.fragments_tag(input_path) frag_map_glob = os.path.join(self.fragments_dir, tag+'?', 'fragment_file_map.json') frag_map = {} for path in glob.glob(frag_map_glob): with open(path) as file: frag_map.update(json.load(file)) # Sort the fragments first by decreasing size of the fragments (because # rosetta insists that the fragment arguments be in this order) and # second alphabetically (for aesthetics). frag_size = lambda x: frag_map[x]['frag_sizes'] frag_paths = sorted(frag_map) frag_paths = sorted(frag_paths, key=frag_size, reverse=True) frag_sizes = [frag_size(x) for x in frag_paths] frag_paths = [os.path.join(self.fragments_dir, x) for x in frag_paths] # If no size-1 fragments were generated, but larger fragments were, # also add the 'none' pseudo-path. This will cause rosetta to make # size-1 fragments from the next largest fragment set. if frag_sizes and frag_sizes[-1] > 1: frag_paths.append('none') frag_sizes.append(1) return frag_paths, frag_sizes def fragments_flags(self, input_path): flags = [] paths, sizes = self.fragments_info(input_path) if paths and sizes: flags.append('-loops:frag_sizes') flags.extend(map(str, sizes)) flags.append('-loops:frag_files') flags.extend(paths) return flags def clear_fragments(self): scripting.clear_directory(self.fragments_dir)
[docs]class RestrainedModels (BigJobWorkspace, WithFragmentLibs): def __init__(self, root): BigJobWorkspace.__init__(self, root) @staticmethod def from_directory(directory): return RestrainedModels(os.path.join(directory, '..')) @property def focus_dir(self): return os.path.join(self.root_dir, '01_restrained_models') @property def input_dir(self): return self.root_dir @property def input_paths(self): return [self.input_pdb_path]
[docs]class FixbbDesigns (BigJobWorkspace): def __init__(self, root, round): BigJobWorkspace.__init__(self, root) self.round = int(round) @staticmethod def from_directory(directory): root = os.path.join(directory, '..') round = int(directory.split('_')[-1]) return FixbbDesigns(root, round) @property def predecessor(self): if self.round == 1: return RestrainedModels(self.root_dir) else: return ValidatedDesigns(self.root_dir, self.round - 1) @property def focus_dir(self): assert self.round > 0 prefix = 2 * self.round subdir = '{0:02}_fixbb_designs_round_{1}'.format(prefix, self.round) return os.path.join(self.root_dir, subdir)
[docs]class ValidatedDesigns (BigJobWorkspace, WithFragmentLibs): def __init__(self, root, round): BigJobWorkspace.__init__(self, root) self.round = int(round) @staticmethod def from_directory(directory): root = os.path.join(directory, '..') round = int(directory.strip('/').split('_')[-1]) return ValidatedDesigns(root, round) @property def predecessor(self): return FixbbDesigns(self.root_dir, self.round) @property def focus_dir(self): assert self.round > 0 prefix = 2 * self.round + 1 subdir = '{0:02}_validated_designs_round_{1}'.format(prefix, self.round) return os.path.join(self.root_dir, subdir) @property def output_subdirs(self): return sorted(glob.glob(os.path.join(self.output_dir, '*/'))) def output_subdir(self, input_name): basename = os.path.basename(input_name[:-len('.pdb.gz')]) return os.path.join(self.output_dir, basename)
def big_job_dir(): return os.path.join(os.path.dirname(__file__), 'big_jobs') def big_job_path(basename): return os.path.join(big_job_dir(), basename)
[docs]def workspace_from_dir(directory, recurse=True): """ Construct a workspace object from a directory name. If recurse=True, this function will search down the directory tree and return the first workspace it finds. If recurse=False, an exception will be raised if the given directory is not a workspace. Workspace identification requires a file called 'workspace.pkl' to be present in each workspace directory, which can unfortunately be a little fragile. """ directory = os.path.abspath(directory) pickle_path = os.path.join(directory, 'workspace.pkl') # Make sure the given directory contains a 'workspace' file. This file is # needed to instantiate the right kind of workspace. if not os.path.exists(pickle_path): if recurse: parent_dir = os.path.dirname(directory) # Keep looking for a workspace as long as we haven't hit the root # of the file system. If an exception is raised, that means no # workspace was found. Catch and re-raise the exception so that # the name of the directory reported in the exception is meaningful # to the user. try: return workspace_from_dir(parent_dir, parent_dir != '/') except WorkspaceNotFound: raise WorkspaceNotFound(directory) else: raise WorkspaceNotFound(directory) # Load the 'workspace' file and create a workspace. with open(pickle_path) as file: workspace_class = pickle.load(file) return workspace_class.from_directory(directory)
[docs]def load_loops(directory, loops_path=None): """ Return a list of tuples indicating the start and end points of the loops that were sampled in the given directory. """ if loops_path is None: workspace = workspace_from_dir(directory) loops_path = workspace.loops_path from klab.rosetta.input_files import LoopsFile loops_parser = LoopsFile.from_filepath(loops_path) # We have to account for some weird indexing behavior in the loops file # parser that I don't really understand. It seems to shrink the loop by # one residue on each side. At first I thought it might be trying to # convert the indices to python indexing, but on second thought I have no # idea what it's trying to do. return [(x-1, y+1) for x, y in loops_parser.get_distinct_segments()]
[docs]def load_resfile(directory, resfile_path=None): """ Return a list of tuples indicating the start and end points of the loops that were sampled in the given directory. """ if resfile_path is None: workspace = workspace_from_dir(directory) resfile_path = workspace.resfile_path from klab.rosetta.input_files import Resfile return Resfile(resfile_path)
def fetch_data(directory, remote_url=None, include_logs=False, dry_run=False): import os, subprocess workspace = workspace_from_dir(directory) # Try to figure out the remote URL from the given directory, if a # particular URL wasn't given. if remote_url is None: remote_url = workspace.rsync_url # Make sure the given directory is actually a directory. (It's ok if it # doesn't exist; rsync will create it.) if os.path.exists(directory) and not os.path.isdir(directory): print "Skipping {}: not a directory.".format(directory) return # Compose an rsync command to copy the files in question. Then either run # or print that command, depending on what the user asked for. rsync_command = [ 'rsync', '-avr', '--exclude', 'rosetta', '--exclude', 'rsync_url', '--exclude', 'core.*', ] if not include_logs: rsync_command += [ '--exclude', 'stdout', '--exclude', 'stderr', '--exclude', '*.sc', ] # This code is trying to combine the remote URL with a directory path. # Originally I was just using os.path.join() to do this, but that caused a # bug when the URL was something like "chef:". This is supposed to specify # a path relative to the user's home directory, but os.path.join() adds a # slash and turns the path into an absolute path. sep = '' if remote_url.endswith(':') else '/' remote_dir = os.path.normpath( remote_url + sep + os.path.relpath(directory, workspace.parent_dir)) rsync_command += [ remote_dir + '/', directory, ] if dry_run: print ' '.join(rsync_command) else: subprocess.call(rsync_command) def fetch_and_cache_data(directory, remote_url=None, include_logs=False): from . import structures fetch_data(directory, remote_url, include_logs) # Don't try to cache anything if nothing has been downloaded yet. if glob.glob(os.path.join(directory, '*.pdb*')): structures.load(directory) def push_data(directory, remote_url=None, dry_run=False): import os, subprocess workspace = workspace_from_dir(directory) if remote_url is None: remote_url = workspace.rsync_url remote_dir = os.path.normpath(os.path.join( remote_url, os.path.relpath(directory, workspace.parent_dir))) rsync_command = [ 'rsync', '-avr', '--exclude', 'rosetta', '--exclude', 'rsync_url', '--exclude', 'stdout', '--exclude', 'stderr', directory + '/', remote_dir, ] if dry_run: print ' '.join(rsync_command) else: subprocess.call(rsync_command)
[docs]class PipelineError (IOError): def __init__(self, message): super(PipelineError, self).__init__(message) self.no_stack_trace = True
[docs]class PathNotFound (PipelineError): def __init__(self, path, *directories): if len(directories) == 0: message = "'{0}' not found.".format(path) elif len(directories) == 1: path = os.path.join(directories[0], path) message = "'{0}' not found.".format(path) else: message = "'{0}' not found. Looked in:".format(path) for directory in directories: message += "\n " + directory PipelineError.__init__(self, message)
[docs]class RosettaNotFound (PipelineError): def __init__(self, workspace): PipelineError.__init__(self, """\ No rosetta checkout found in '{0.root_dir}'. Use the following command to manually create a symlink to a rosetta checkout: $ ln -s /path/to/rosetta/checkout {0.rosetta_dir}""")
[docs]class WorkspaceNotFound (PipelineError): def __init__(self, root): message = "'{0}' is not a workspace.".format(root) PipelineError.__init__(self, message)
[docs]class UnspecifiedRemoteHost (PipelineError): def __init__(self): PipelineError.__init__(self, "No remote host specified.")