# -*- coding: utf-8 -*-
# Aqua-Duct, a tool facilitating analysis of the flow of solvent molecules in molecular dynamic simulations
# Copyright (C) 2018-2019 Tomasz Magdziarz, Michał Banas <info@aquaduct.pl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from aquaduct import logger
from aquaduct.utils.helpers import version_parser
import MDAnalysis as mda
################################################################################
# Check MDAnalysis version
def mda_ver():
return version_parser(mda.__version__)
# FIXME: do it according to user options
if mda_ver() < version_parser('0.16'):
logger.error('Unsupported MDAnalysis version %s; should be 0.16.2 or > 0.19.', mda.__version__)
raise NotImplementedError('Unsupported MDAnalysis version %s; should be 0.16.2 or > 0.19.' % mda.__version__)
if mda_ver() < version_parser('0.17') and mda_ver() < version_parser('0.20'):
logger.warning('Unsupported MDAnalysis version %s.', mda.__version__)
logger.warning('Trying to mitigate potential problems by setting `use_periodic_selections = False`.')
mda.core.flags['use_periodic_selections'] = False
################################################################################
# rest of imports
import re
from os.path import splitext
from os import pathsep
from collections import OrderedDict, namedtuple
import numpy as np
from MDAnalysis.topology.core import guess_atom_element
from aquaduct.utils.helpers import SmartRange, SmartRangeIncrement
from aquaduct.geom.convexhull import SciPyConvexHull, are_points_within_convexhull
from aquaduct.utils.helpers import arrayify, create_tmpfile, tupleify
from aquaduct.utils.maths import make_default_array
from aquaduct.apps.data import GCS, CRIC
from aquaduct.utils.maths import defaults
################################################################################
# import or create memory decorator
if GCS.cachedir:
from joblib import Memory
memory_cache = Memory(cachedir=GCS.cachedir,
verbose=0)
# mmap have to be switched off, otherwise smoothing does not work properly
# memory_cache = Memory(cachedir=GCS.cachedir, mmap_mode='r', verbose=0)
memory = memory_cache.cache
elif GCS.cachemem:
from aquaduct.utils.helpers import memory_in_memory as memory
else:
from aquaduct.utils.helpers import noaction as memory
################################################################################
# trajectory window object
class Window(object):
def __init__(self, start, stop, step):
self.start = self._none_or_int(start)
self.stop = self._none_or_int(stop)
self.step = self._none_or_int(step)
@staticmethod
def _none_or_int(nr):
if nr is not None:
return int(nr)
def __repr__(self):
return "Window(%r:%r:%r)" % (self.start, self.stop, self.step)
def correct(self, real_frame_no):
if self.start is None:
self.start = 0
elif self.start < 0:
self.start = 0
elif self.start > real_frame_no:
self.start = real_frame_no - 1
if self.stop is None:
self.stop = real_frame_no - 1
elif self.stop < 0:
self.stop = 0
elif self.stop > real_frame_no:
self.stop = real_frame_no - 1
if self.step is None:
self.step = 1
elif self.step < 0:
self.step = 1
def range(self, reverse=False):
# returns range object
if reverse:
return range(self.stop, self.start - 1, -1 * self.step)
return range(self.start, self.stop + 1, self.step)
def get_real(self, frame):
# recalculates real frame
return self.start + frame * self.step
def len(self):
# lenght of window
return len(self.range())
def split(self, slices=None):
assert slices > 0
N = int(max(1, np.floor(self.len() / float(slices))))
for start in list(self.range())[::(N + 1)]:
yield Window(start, min(self.stop, start + (N) * self.step), self.step)
################################################################################
# MasterReader
# engine problem
# Two options are currently available:
# MDAnalysis
# MDTraj
[docs]class OpenReaderTraj(namedtuple('OpenReaderTraj', 'topology trajectory number window engine')):
def open(self):
# convenience function
return open_traj_reader(self)
class MasterReader(object):
# only one MasterReader object can be used
# it does not use ANY direct call to ANY MD access software
open_reader_traj = {}
topology = ['']
trajectory = ['']
window = None # this is full window
sandwich_mode = None
engine_name = 'mda'
threads = 1
threads_multiply = 1
edges = []
def __del__(self):
self.reset()
def __call__(self, topology, trajectory, window=None, sandwich=False, threads=1):
"""
:param list topology: List of topologies. Each element is a file name.
:param list trajectory: List of trajectories. Each element is a file name.
:param Window window: Frames window to read.
:param bool sandwich: Flag for setting sandwich mode.
If no sandiwch mode is used, number of topologies has to be precisely 1.
In sandwich mode it can be either 1 or equal to the number of trajectory files.
"""
if not isinstance(topology, list):
topology = [t.strip() for t in topology.split(pathsep)]
self.topology = topology
if not isinstance(trajectory, list):
trajectory = [t.strip() for t in trajectory.split(pathsep)]
self.trajectory = trajectory
self.window = window
self.sandwich_mode = sandwich
if len(self.topology) > 1:
assert self.sandwich_mode, "Multiple topologies possible in sandwich mode only."
assert len(self.topology) == len(
self.trajectory), "Number of topologies must be 1 or be equal to number of trajectories."
self.window.correct(self.real_number_of_frames()) # this corrects window
self.reset() # assert window correction clear all opened trajs
self.threads = threads
def reset(self):
self.open_reader_traj = {}
'''
def __getstate__(self):
# if pickle dump is required, this will not be used in the future
# do not pass open_reader_traj
return dict(((k, v) for k, v in self.__dict__.items() if k not in ['open_reader_traj']))
def __setstate__(self, state):
# if pickle dump is required, this will not be used in the future
self.__dict__ = state
self.open_reader_traj = {}
'''
def getrecallstate(self):
# TODO: to be removed?
return dict(topology=self.topology,
trajectory=self.trajectory,
window=self.window,
sandwich=self.sandwich_mode,
threads=self.threads)
'''
@property
def engine(self):
# returns engine used for accessing trajectory
# this returns object that inherits from ReaderTraj
if self.engine_name == 'mda':
return ReaderTrajViaMDA
raise NotImplementedError
'''
def engine(self, topology, trajectory, number, window):
'''
:param str topology: Topology file name.
:param list trajectory: List of trajectories. Each element is a file name. Alternatively, trajectory file name.
:param int number: Number of the layer.
:param Window window: Frames window.
:return:
'''
assert self.engine_name in ['mda']
return OpenReaderTraj(topology, trajectory, number, window, self.engine_name)
def __repr__(self):
sandwich = ''
if self.sandwich_mode:
sandwich = ',sandwich'
return "Reader(%s,%s,%r%s)" % (
"[%s]" % (','.join(self.topology)), "[%s]" % (','.join(self.trajectory)), self.window, sandwich)
def sandwich(self, number=False):
# generates readers of consecutive sandwich layers
for nr, trajectory in enumerate(self.trajectory):
if number:
yield nr, self.get_single_reader(nr)
else:
yield self.get_single_reader(nr)
def baguette(self, number=False):
# generates reader of trajectory file(s)
if number:
yield 0, self.get_single_reader(0)
else:
yield self.get_single_reader(0)
def strata(self, number=False):
# generates slices of baquette
for nr in range(self.number_of_layers()):
if number:
yield nr + 1, self.get_single_reader(nr + 1)
else:
yield self.get_single_reader(nr + 1)
def iterate(self, number=False, threads=True):
# iterates over trajectory readers
# calls sandwich or baguette
if self.sandwich_mode:
return self.sandwich(number=number)
elif threads and self.threads > 1:
return self.strata(number=number)
return self.baguette(number=number)
def get_single_raw_reader_per_trajectory(self, number):
# here number is a number of trajectory, starting from 0
if len(self.topology) == len(self.trajectory):
return self.engine(self.topology[number],
[self.trajectory[number]],
number=number,
window=Window(0, None, 1))
else:
return self.engine(self.topology[0],
[self.trajectory[number]],
number=number,
window=Window(0, None, 1))
def get_single_reader(self, number):
# returns single trajectory reader of number
# is it is already opened it is returned directly
# if not is opened and then recursive call is executed
# print "GetSingleReader(%r)" % (number,)
if self.sandwich_mode:
if len(self.topology) == 1:
return self.engine(self.topology[0], self.trajectory[number], number=number, window=self.window)
else:
return self.engine(self.topology[number], self.trajectory[number], number=number, window=self.window)
elif number > 0 and self.threads > 1:
window = list(self.window.split(self.number_of_layers()))[number - 1]
return self.engine(self.topology[0], self.trajectory, number=number, window=window)
else:
assert number == 0, "Sandwich/baguette mismatch. Try use/not use --sandwich option."
return self.engine(self.topology[0], self.trajectory, number=0, window=self.window)
def get_reader_by_id(self, someid):
# returns trajectory reader of number in someid
# someid is tuple (number,ix)
return self.get_single_reader(someid[0])
def real_number_of_frames(self):
# returns number of frames in traj files
# in sandwich it returns number of frames in first layer
# in baguette it returns total number of frames (so it is also number of frames in layer 0)
return open_traj_reader(self.get_single_reader(0)).real_number_of_frames()
def get_edges(self):
# should return list of edges of trajectory
# if one trajectory file is used there are two edges begin and end
# for each additional trajectory file additional edge marking joining point is returned
# make sense in non sandwich mode only
# reader has to be reset before and after
self.reset()
# to calculate this each individual file should be open as separate trajectory
# 1. get real number of frames for each part
rnf = [0]
for number in range(len(self.trajectory)):
# if len(rnf)==0:
# rnf.append(0)
# else:
# rnf.append(rnf[-1]+1)
# rnf.append(rnf[-1]-1+open_traj_reader(self.get_single_raw_reader_per_trajectory(number)).real_number_of_frames())
rnf.append(open_traj_reader(self.get_single_raw_reader_per_trajectory(number)).real_number_of_frames())
# cumulative sum
rnf = (np.cumsum(rnf) - 1).tolist()
# 2. by using window try to calculate where are the edges
E = [] # list of edges
for nr, rf in enumerate(self.window.range()): # iterate over real frames
if rf >= rnf[0]:
# this is an edge
if self.window.step > 1:
E.append(nr - 1)
else:
E.append(nr)
while len(rnf) and rf >= rnf[0]:
rnf.pop(0)
if len(rnf):
E.append(nr)
self.reset()
return E[1:-1]
def number_of_frames(self, onelayer=False):
# number of frames in the window times number of layers
# in baguette number of layers is 1
if self.sandwich_mode and not onelayer:
return len(self.trajectory) * self.window.len()
return self.window.len()
def number_of_layers(self):
if self.sandwich_mode:
return len(self.trajectory)
nof = self.number_of_frames()
nol = self.threads * self.threads_multiply
return nol if nof > nol else max(nof - 1, 1)
# instance of MasterReader
Reader = MasterReader()
def open_traj_reader_engine(ort):
if ort.engine == 'mda':
return ReaderTrajViaMDA(ort.topology, ort.trajectory, number=ort.number, window=ort.window)
raise NotImplementedError
def open_traj_reader(ort):
if ort.number not in Reader.open_reader_traj:
Reader.open_reader_traj.update({ort.number: open_traj_reader_engine(ort)})
return Reader.open_reader_traj[ort.number]
class ReaderAccess(object):
# ReaderAccess class provides reader property that returns current instance of MasterReader
def get_reader(self, number):
if number in Reader.open_reader_traj:
# print "Getting reader",number,"from opened readers."
return Reader.open_reader_traj[number]
Reader.get_single_reader(number).open()
return self.get_reader(number)
def get_reader_by_id(self, someid):
return self.get_reader(someid[0])
def get_edges(self):
return Reader.edges
################################################################################
# VdW radii
VdW_radii = {'ac': 2.47,
'ag': 2.11,
'al': 1.84,
'am': 2.44,
'ar': 1.88,
'as': 1.85,
'at': 2.02,
'au': 2.14,
'b': 1.92,
'ba': 2.68,
'be': 1.53,
'bi': 2.07,
'bk': 2.44,
'br': 1.85,
'c': 1.7,
'ca': 2.31,
'cd': 2.18,
'ce': 2.42,
'cf': 2.45,
'cl': 1.75,
'cm': 2.45,
'co': 2.0,
'cr': 2.06,
'cs': 3.43,
'cu': 1.96,
'dy': 2.31,
'er': 2.29,
'es': 2.45,
'eu': 2.35,
'f': 1.47,
'fe': 2.04,
'fm': 2.45,
'fr': 3.48,
'ga': 1.87,
'gd': 2.34,
'ge': 2.11,
'h': 1.1,
'he': 1.4,
'hf': 2.23,
'hg': 2.23,
'ho': 2.3,
'i': 1.98,
'in': 1.93,
'ir': 2.13,
'k': 2.75,
'kr': 2.02,
'la': 2.43,
'li': 1.82,
'lr': 2.46,
'lu': 2.24,
'md': 2.46,
'mg': 1.73,
'mn': 2.05,
'mo': 2.17,
'n': 1.55,
'na': 2.27,
'nb': 2.18,
'nd': 2.39,
'ne': 1.54,
'ni': 1.97,
'no': 2.46,
'np': 2.39,
'o': 1.52,
'os': 2.16,
'p': 1.8,
'pa': 2.43,
'pb': 2.02,
'pd': 2.1,
'pm': 2.38,
'po': 1.97,
'pr': 2.4,
'pt': 2.13,
'pu': 2.43,
'ra': 2.83,
'rb': 3.03,
're': 2.16,
'rh': 2.1,
'rn': 2.2,
'ru': 2.13,
's': 1.8,
'sb': 2.06,
'sc': 2.15,
'se': 1.9,
'si': 2.1,
'sm': 2.36,
'sn': 2.17,
'sr': 2.49,
'ta': 2.22,
'tb': 2.33,
'tc': 2.16,
'te': 2.06,
'th': 2.45,
'ti': 2.11,
'tl': 1.96,
'tm': 2.27,
'u': 2.41,
'v': 2.07,
'w': 2.18,
'xe': 2.16,
'y': 2.32,
'yb': 2.26,
'zn': 2.01,
'zr': 2.23}
'''
Dictionary of VdW radii.
Data taken from L. M. Mentel, mendeleev, 2014. Available at: https://bitbucket.org/lukaszmentel/mendeleev.
Package **mendeleev** is not used because it depends on too many other libraries.
'''
################################################################################
# Reader Traj base class
# class ReaderTraj(ReaderAccess):
class ReaderTraj(object):
def __init__(self, topology, trajectory,
number=None, window=None):
"""
:param str topology: Topology file name.
:param list trajectory: Trajectory file name.
:param int number: Number of trajectory file.
:param Window window: Frames window to read.
:param Reader reader: Parent reader object.
This is base class for MD data access engines.
"""
self.topology = topology # here, topology is only one file
self.trajectory = trajectory
if not isinstance(trajectory, list):
self.trajectory = [t.strip() for t in trajectory.split(pathsep)]
if not topology or (isinstance(topology,str) and len(topology.strip())==0):
raise TypeError('Empty topology file')
# print "ReaderTraj(%r,%r)" % (self.topology,self.trajectory)
self.number = number
self.window = window
self.real_frame_nr = None
self.trajectory_object = self.open_trajectory()
def __repr__(self):
if len(self.trajectory) == 1:
return "%s(%s,%s,%d,%r)" % (
self.__class__.__name__, self.topology, self.trajectory[0], self.number, self.window)
return "%s(%s,%s,%d,%r)" % (
self.__class__.__name__, self.topology, "[%s]" % (','.join(self.trajectory)), self.number, self.window)
def iterate_over_frames(self, reverse=False):
window_len = self.window.len()
for frame, real_frame in enumerate(self.window.range(reverse=reverse)):
self.set_real_frame(real_frame)
if reverse:
yield window_len - frame - 1
else:
yield frame
def iterate(self, reverse=False):
return self.iterate_over_frames(reverse=reverse)
def set_frame(self, frame):
return self.set_real_frame(self.window.get_real(frame))
def dump_frames(self, frames, selection=None, filename=None):
if filename is None:
filename = create_tmpfile(ext='pdb')
self.dump_frames_to_file(frames, selection, filename)
return filename
def __del__(self):
self.close_trajectory()
def number_of_frames(self):
# should return number of frames in this reader (window)
return self.window.len()
def open_trajectory(self):
# should return any object that can be further used to parse trajectory
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def close_trajectory(self):
# should close trajectory reader in self.trajectory_object
# WARNING: This method has to be carefully implemented because it is used by __del__ and
# should not emmit any error messages. This is subjet of change.
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def set_real_frame(self, real_frame):
self.real_frame_nr = real_frame
# sets real frame
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def real_number_of_frames(self):
# should return number of frames
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def parse_selection(self, selection):
# should return selection
# selection resolution should be atoms
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def select_all(self):
# should return selection of all atoms
# selection resolution should be atoms
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def atom_vdw(self, atomid):
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def atom2residue(self, atomid):
# one atom id to one residue id
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def atoms_positions(self, atomids):
# atoms ids to coordinates
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def residues_positions(self, resids):
# residues ids to center of masses coordinates
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def residues_names(self, resids):
# residues ids to center of masses coordinates
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def atoms_masses(self, atomids):
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def dump_frames_to_file(self, frames, selection, filename):
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
################################################################################
# ReaderTraj engine MDAnalysis
mda_available_formats = {re.compile('(nc|NC)'): 'nc',
re.compile('(prmtop|parmtop|top|PRMTOP|PARMTOP|TOP)'): 'PRMTOP',
re.compile('(dcd|DCD)'): 'LAMMPS',
re.compile('(psf|PSF)'): 'psf',
re.compile('(pdb|PDB)'): 'pdb',
re.compile('(crd|CRD)'): 'crd',
re.compile('(xtc|XTC)'): 'XTC'}
class ReaderTrajViaMDA(ReaderTraj):
def open_trajectory(self):
topology = splitext(self.topology)[1][1:]
for afk in list(mda_available_formats.keys()):
if afk.match(topology):
topology = mda_available_formats[afk]
break
trajectory = splitext(self.trajectory[0])[1][1:]
for afk in list(mda_available_formats.keys()):
if afk.match(trajectory):
trajectory = mda_available_formats[afk]
break
#print("%"*80)
#print(topology,trajectory)
return mda.Universe(self.topology,
self.trajectory,
topology_format=topology,
format=trajectory)
def close_trajectory(self):
if hasattr(self, 'trajectory_object'):
if hasattr(self.trajectory_object, 'trajectory'):
if hasattr(self.trajectory_object.trajectory, 'close'):
self.trajectory_object.trajectory.close()
def set_real_frame(self, real_frame):
self.real_frame_nr = real_frame
self.trajectory_object.trajectory[real_frame]
def parse_selection(self, selection):
return AtomSelection({self.number: self.trajectory_object.select_atoms(selection).atoms.ix})
def select_all(self):
return self.parse_selection("name *")
def atom2residue(self, atomid):
return self.trajectory_object.atoms[atomid].residue.ix
def atoms_positions(self, atomids):
# atoms ids to coordinates
return self.trajectory_object.atoms[atomids].positions
def residues_positions(self, resids):
# residues ids to center of masses coordinates
return (res.atoms.center_of_geometry() for res in
self.trajectory_object.residues[list(resids)])
def residues_names(self, resids):
# residues ids to center of masses coordinates
for rid in resids:
yield self.trajectory_object.residues[rid].resname
def real_number_of_frames(self):
# should return number of frames
return len(self.trajectory_object.trajectory)
def atoms_masses(self, atomids):
return self.trajectory_object.atoms[atomids].masses
def atom_vdw(self, atomid):
element = str(guess_atom_element(self.trajectory_object.atoms[atomid].name)).lower()
if element in VdW_radii:
return VdW_radii[element]
element = str(self.trajectory_object.atoms[atomid].element).lower()
if element in VdW_radii:
return VdW_radii[element]
return 1.4
def dump_frames_to_file(self, frames, selection, filename):
mdawriter = mda.Writer(filename, multiframe=True)
to_dump = self.trajectory_object.select_atoms(selection)
for frame in frames:
self.set_frame(frame)
mdawriter.write(to_dump)
mdawriter.close()
################################################################################
# Selection objects
class Selection(ReaderAccess):
def __init__(self, selected):
self.selected = OrderedDict(selected)
for number, ids in self.selected.items():
self.selected[number] = list(map(defaults.int_default, ids))
def layer(self, number):
if number in self.selected:
return self.__class__({number: self.selected[number]})
return self.__class__({})
def numbers(self):
return list(self.selected.keys())
def ix(self, ix):
# gets selection of index ix
ix_current = 0
for number, ids in self.selected.items():
if ix_current + len(ids) >= ix + 1:
# it is here!
return self.__class__({number: [ids[ix - ix_current]]}) # FIXME: looks like a bug!
ix_current += len(ids)
raise IndexError()
def single(self, selection_id):
# TODO: suboptimal and probably does not make any sense
if selection_id[0] in self.selected:
if selection_id[-1] in self.selected[selection_id[0]]:
return self.__class__({selection_id[0]: [selection_id[-1]]})
raise IndexError()
def len(self):
_len = 0
for number, ids in self.selected.items():
_len += len(ids)
return _len
def __len__(self):
return self.len()
def add(self, other):
for number, ids in other.selected.items():
if number in self.selected:
self.selected[number] = self.selected[number] + list(ids)
else:
self.selected.update({number: ids})
def remove(self, other):
"""
Remove all items that exist in other selection.
:param other: Other selection.
"""
empty_keys = []
for number, ids in other.selected.items():
self.selected[number] = [id_ for id_ in self.selected[number] if id_ not in ids]
if not self.selected[number]:
empty_keys.append(number)
for k in empty_keys:
del self.selected[k]
def uniquify(self):
for number, ids in self.selected.items():
self.selected[number] = sorted(set(ids))
def ids(self):
# report it in this order! always!
# these are not unique! run run uniqify first!
for number, ids in self.selected.items():
for i in ids:
yield (number, i)
def coords(self):
# order of coords should be the same as in ids!
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
def center_of_mass(self):
raise NotImplementedError("This is abstract class. Missing implementation in a child class.")
################################################################################
class AtomSelection(Selection):
def vdw(self):
for number, ids in self.selected.items():
for aid in ids:
yield self.get_reader(number).atom_vdw(aid)
def residues(self):
# returns residues selection
def get_unique_residues():
for number, ids in self.selected.items():
number_reader = self.get_reader(number)
yield number, sorted(set(map(number_reader.atom2residue, ids)))
return ResidueSelection(get_unique_residues())
def coords(self):
for number, ids in self.selected.items():
number_reader = self.get_reader(number)
for coord in number_reader.atoms_positions(ids): # .tolist():
yield coord
def center_of_mass(self):
center = np.zeros(3)
total_mass = 0
for number, ids in self.selected.items():
masses = self.get_reader(number).atoms_masses(ids)
masses.shape = (len(masses), 1)
total_mass += sum(masses)
center += (masses * self.get_reader(number).atoms_positions(ids)).sum(0)
return center / total_mass
def contains_residues(self, other_residues, convex_hull=False, convex_hull_inflate=None, map_fun=None,
known_true=None):
# FIXME: known_true slows down!
# known_true are only ix!
assert isinstance(other_residues, ResidueSelection)
if map_fun is None:
map_fun = map
# known_true should be list of ids
if known_true is not None and len(known_true):
this_ids = list(self.residues().ids())
kt_list = []
ch_list = []
other_coords = []
for other_id, other_coord in zip(other_residues.ids(), other_residues.coords()):
if other_id[-1] in known_true:
kt_list.append(other_id)
elif convex_hull:
ch_list.append(other_id)
other_coords.append(other_coord)
elif other_id in this_ids:
kt_list.append(other_id)
if convex_hull:
chull = self.chull(inflate=convex_hull_inflate)
# ch_results = map_fun(is_point_within_convexhull, izip_longest(other_coords, [], fillvalue=chull))
ch_results = are_points_within_convexhull(other_coords, chull)
# final merging loop
final_results = []
for other_id in other_residues.ids():
if other_id in ch_list:
final_results.append(ch_results[ch_list.index(other_id)])
elif other_id in kt_list:
final_results.append(True)
else:
final_results.append(False)
return final_results
else:
if convex_hull:
other_coords = other_residues.coords()
chull = self.chull(inflate=convex_hull_inflate)
# return map_fun(is_point_within_convexhull, izip_longest(other_coords, [], fillvalue=chull))
return are_points_within_convexhull(other_coords, chull)
else:
# check if other selection is empty
this_ids = list(self.residues().ids())
return [other_id in this_ids for other_id in other_residues.ids()]
def containing_residues(self, other_residues, *args, **kwargs):
# Convenience wrapper for contains_residues.
# def get_res_in_scope(is_res_in_scope, res):
other_new = OrderedDict()
for iris, (number, resid) in zip(self.contains_residues(other_residues, *args, **kwargs), other_residues.ids()):
if iris:
if number in other_new:
other_new[number].append(resid)
else:
other_new.update({number: [resid]})
return ResidueSelection(other_new)
def chull(self, inflate=None):
if self.len() > 3:
return SciPyConvexHull(list(self.coords()), inflate=inflate)
################################################################################
class ResidueSelection(Selection):
def coords(self):
for number, ids in self.selected.items():
number_reader = self.get_reader(number)
for coord in number_reader.residues_positions(ids):
yield coord # .tolist()
def names(self):
for number, ids in self.selected.items():
for name in self.get_reader(number).residues_names(ids):
yield name
def single_residues(self):
for resid in self.ids():
yield SingleResidueSelection(resid)
################################################################################
@memory
def coords_range_core(srange, number, rid):
assert isinstance(srange, SmartRangeIncrement), "Expecting SmartRangeIncrement, got %r" % type(srange)
@arrayify(shape=(None, 3))
def coords_range_core_inner(srange, number, rid):
reader = Reader.get_single_reader(number).open()
for f in srange.get():
reader.set_frame(f)
yield next(reader.residues_positions([rid]))
return coords_range_core_inner(srange, number, rid)
@arrayify(shape=(None, 3))
def coords_range(srange, number, rid):
# srange is SmartRangeIncrement, it cannot be anything else
# wrapper to limit number of calls to coords_range_core
# CRIC cache is {number:{rid:FramesRangeCollection}}
logger.debug("CRIC global request %d:%d %s", number, rid, str(srange))
'''
if number not in CRIC.cache:
CRIC.cache.update({number:{}})
logger.debug("CRIC new number %d",number)
if rid not in CRIC.cache[number]:
CRIC.cache[number].update({rid: FramesRangeCollection()})
logger.debug("CRIC new rid %d",rid)
'''
# call get_ranges and stack array, do it in comprehension? nested function?
def get_coords_from_cache():
for sr, xr in CRIC.get_frc(number, rid).get_ranges(srange):
logger.debug("CRIC partial request %d:%d %s", number, rid, str(sr))
yield coords_range_core(sr, number, rid)[xr, :]
return np.vstack(tuple(get_coords_from_cache()))
################################################################################
@memory
def smooth_coords_ranges(sranges, number, rid, smooth):
@tupleify
def smooth_coords_ranges_inner(sranges, number, rid, smooth):
# here, sranges are list of SmartRange objects whereas coords_range accepts SmartRangeIncrement
# first get all coords and make in continous
def sranges2coords_cont():
for srange in sranges:
for srangei in srange.raw:
yield coords_range(srangei, number, rid)
coords_cont = make_default_array(np.vstack([c for c in sranges2coords_cont() if len(c) > 0]))
# call smooth
coords_cont = smooth(coords_cont)
# split coords_cont
# now lets return tupple of coords
nr = 0
for path in sranges:
if len(path) > 0:
yield make_default_array(coords_cont[nr:nr + len(path)])
nr += len(path)
else:
# TODO: reuse some kind of empty coords
yield make_default_array(np.zeros((0, 3)))
return smooth_coords_ranges_inner(sranges, number, rid, smooth)
################################################################################
class SingleResidueSelection(ReaderAccess):
def __init__(self, resid):
super(SingleResidueSelection, self).__init__()
# where resid is id reported by ResidueSelection and reader is Reader
# resid is tuple (number,id) number is used to get reader_traj
self.resid = resid[-1]
self.number = resid[0]
# to optionally simplify CRIC store lets read minimal and maximal frame
# and always ask for full range and then slice it according to frames.
traj_reader = self.get_reader(self.number)
# full range always
self.always_request_frames = SmartRangeIncrement(0,traj_reader.number_of_frames())
def coords(self, frames):
if isinstance(frames, SmartRange):
if len(frames):
if GCS.cachetype == 'simple':
# this is naive version
full_coords = coords_range(self.always_request_frames,self.number,self.resid)
#return np.vstack([full_coords[list(srange.get()),:] for srange in frames.raw])
def get_partial_coords():
for sr in frames.raw:
lo = sr.first_element() - self.always_request_frames.first_element()
hi = lo + len(sr)
yield full_coords[lo:hi,:]
return np.vstack(list(get_partial_coords()))
else: # if full
# the normal way
return np.vstack([coords_range(srange, self.number, self.resid) for srange in frames.raw])
return self._coords([])
else:
return self._coords(frames)
@arrayify(shape=(None, 3))
def _coords(self, frames):
# return coords for frames
if len(frames):
# print "Trying to get reader",self.number,"for",len(frames),"frames"
traj_reader = self.get_reader(self.number)
for f in frames:
traj_reader.set_frame(f)
yield next(traj_reader.residues_positions([self.resid]))
def coords_smooth(self, sranges, smooth):
for coord in smooth_coords_ranges(sranges, self.number, self.resid, smooth):
yield coord
################################################################################
# flag sandwich as imported
GCS.sandwich_import = True