# -*- coding: utf-8 -*-
# Aqua-Duct, a tool facilitating analysis of the flow of solvent molecules in molecular dynamic simulations
# Copyright (C) 2018-2019 Tomasz Magdziarz <info@aquaduct.pl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from aquaduct.traj.paths import GenericPaths
from collections import defaultdict
import operator
import numpy as np
from multiprocessing import Manager
from scipy import spatial
[docs]def get_spc(sp, window=None):
'''
:param sp: Single path like object or Generic path.
:param tuple window: Optional frames window.
:rtype: numpy.ndarray
:return: Coordinates of path; to be used in pocket calculation.
'''
if isinstance(sp, GenericPaths):
if window is None:
return sp.coords
f = np.array(sp.frames)
i = (f >= window[0]) & (f <= window[1])
return get_spc(sp)[i]
else:
if window is None:
return sp.coords_cont
f = np.array(sp.paths_cont)
i = (f >= window[0]) & (f <= window[1])
return get_spc(sp)[i]
[docs]def find_minmax(spaths, pbar=None):
'''
:param list spaths: List of single like path objects.
:param pbar: Optional progress object providing next() method.
:rtype: 2 element tuple of numpy.ndarray each of shape (3,)
:return: Minimal and maximal boundaries of coordinates used in pocket calulations of spaths.
'''
minc = np.array([float('inf')] * 3)
maxc = np.array([float('-inf')] * 3)
for sp in spaths:
sp_minc = get_spc(sp).min(0)
minc_i = minc > sp_minc
minc[minc_i] = sp_minc[minc_i]
sp_maxc = get_spc(sp).max(0)
maxc_i = maxc < sp_maxc
maxc[maxc_i] = sp_maxc[maxc_i]
if pbar:
pbar.next()
minc = np.floor(minc)
maxc = np.ceil(maxc)
return minc, maxc
def find_minmax_single(sp):
coords = get_spc(sp)
return coords.min(0), coords.max(0)
[docs]def find_minmax_map(spaths, pbar=None, map_fun=None):
'''
:param list spaths: List of single like path objects.
:param pbar: Optional progress object providing next() method.
:rtype: 2 element tuple of numpy.ndarray each of shape (3,)
:return: Minimal and maximal boundaries of coordinates used in pocket calulations of spaths.
'''
minc = np.array([float('inf')] * 3)
maxc = np.array([float('-inf')] * 3)
if map_fun is None:
map_fun = imap
for sp_minc, sp_maxc in map_fun(find_minmax_single, spaths):
minc_i = minc > sp_minc
minc[minc_i] = sp_minc[minc_i]
maxc_i = maxc < sp_maxc
maxc[maxc_i] = sp_maxc[maxc_i]
if pbar:
pbar.next()
minc = np.floor(minc)
maxc = np.ceil(maxc)
return minc, maxc
[docs]def find_edges(spaths, grid_size=1., pbar=None, map_fun=None):
'''
:param list spaths: List of single like path objects.
:param float grid_size: Size of grid cell in A.
:param pbar: Optional progress object providing next() method.
:rtype: list of numpy.ndarrays
:return: Edges of bins of grid spanning all submited paths.
'''
return [np.linspace(mi, ma, int((ma - mi) / grid_size) + 1) for mi, ma in
zip(*find_minmax_map(spaths, pbar=pbar, map_fun=map_fun))]
class distribution_worker(object):
def __init__(self, edges=None, window=None):
self.edges = edges
self.window = window
def __call__(self, sp):
return np.histogramdd(get_spc(sp, window=self.window), bins=self.edges)[0]
[docs]def distribution(spaths, grid_size=1., edges=None, window=None, pbar=None, map_fun=None):
'''
:param list spaths: List of single like path objects.
:param float grid_size: Size of grid cell in A.
:param list of numpy.ndarrays edges: Edges of bins of grid spanning all submited paths.
:param tuple window: Optional frames window.
:param pbar: Optional progress object providing next() method.
:rtype tuple of numpy.ndarrays
:return: Coordinates of pocket and number of points.
'''
maxc = np.array(list(map(max, edges)))
minc = np.array(list(map(min, edges)))
# H = np.zeros(map(int,map(np.ceil,(maxc - minc) / grid_size)))
H = np.zeros(tuple([len(e) - 1 if len(e) > 1 else 1 for e in edges]))
if map_fun is None:
map_fun = map
map_worker = distribution_worker(edges=edges, window=window)
for h in map_fun(map_worker, spaths):
H += h
if pbar:
pbar.next()
mg = [ee[:-1] + (grid_size / 2.) for ee in edges]
x, y, z = np.meshgrid(*mg, indexing='ij')
pocket = H > 0
return np.vstack((x[pocket], y[pocket], z[pocket])).T, H[pocket]
[docs]def outer_inner(H, threshold=None):
'''
:param numpy.ndarray H: Pocket distribution.
:param float threshold: Percent value of max density which will be used to partition pocket into inner and outer.
:return: Indices of outer and inner pocket.
:rtype: tuple of numpy.ndarray
'''
if threshold:
assert threshold <= 1.0, "Threshold cannot be higher that 1.0."
assert threshold > 0.0, "Threshold cannot be equal or less than 0."
if (H > 0).any():
OI = H / H[H > 0].mean() if not threshold else H / (H[H > 0].max() * threshold)
return OI < 1, OI >= 1
return np.ones(H.shape) > 1, np.ones(H.shape) == 0 # fall back to outer pocket only
def windows(frames, windows=None, size=None):
yield (0., frames - 1.) # full window
if windows:
if size:
begs = np.linspace(0, frames - size, windows)
ends = np.array([b + size - 1 for b in begs])
if ends[-1] > frames - 1:
ends[-1] = frames - 1
else:
begs = np.linspace(0, frames, windows + 1)[:-1]
ends = np.linspace(-1, frames - 1, windows + 1)[1:]
if ends[-1] < frames - 1:
ends[-1] = frames - 1
for b, e in zip(begs, ends):
yield np.floor(b), np.floor(e)
def sphere_radii(spaths, centers=None, radii=None, window=None, pbar=None, map_fun=None):
H = np.zeros(len(centers), dtype=np.int32)
if map_fun is None:
map_fun = imap
for sp in spaths:
coords = get_spc(sp, window=window)
D = spatial.distance.cdist(coords, centers)
for nr, radius in enumerate(radii):
H[nr] += np.count_nonzero(D[:, nr] <= radius)
if pbar:
pbar.next()
return H
class sphere_radius_worker(object):
def __init__(self, window, centers, radius):
self.window = window
self.centers = centers
self.radius = radius
def __call__(self, sp):
coords = get_spc(sp, window=self.window)
D = spatial.distance.cdist(coords, self.centers) <= self.radius
return np.count_nonzero(D, 0)
class sphere_radius_worker_lowmem(object):
def __init__(self, window, centers, radius):
self.window = window
self.centers = centers
self.radius = radius
def __call__(self, sp):
coords = get_spc(sp, window=self.window)
g = (int(np.count_nonzero(spatial.distance.cdist(coords, [c]) <= self.radius)) for c in self.centers)
return np.fromiter(g, dtype=np.int32)
def sphere_radius(spaths, centers=None, radius=2., window=None, pbar=None, map_fun=None):
H = np.zeros(len(centers), dtype=np.int32)
if map_fun is None:
map_fun = imap
map_worker = sphere_radius_worker_lowmem(window, centers, radius)
for h in map_fun(map_worker, spaths):
H += h
if pbar:
pbar.next()
return H
class sphere_density_raw_worker(object):
def __init__(self, pbar_queue, mol_name, radius, centers, frames):
"""
Worker used to calculate energy per each trajectory.
:param pbar_queue: Progress bar instance
:type pbar_queue: class:`aquaduct.clui.SimpleProgressBar`
:param mol_name: Name of molecule which amount will be calculated around points.
:type mol_name: str
:param radius: Radius of selection for each point.
:type radius: float
:param centers: Points of path
:type centers: Iterator
:param frames: Collection of frame numbers
:type frames: Iterator
"""
self.pbar_queue = pbar_queue
self.mol_name = mol_name
self.radius = radius
self.centers = centers
self.frames = frames
def __call__(self, traj_reader):
"""
Worker
:param traj_reader: Trajectory reader
:type traj_reader: Iterator
:return: Array with density for each point
:rtype: numpy.ndarray
"""
H = np.zeros(len(self.centers))
traj_reader = traj_reader.open()
# Iterating over trajectory reader starts from 0.
# It's needed to add value of first frame to check if frame is within frames range
base = traj_reader.window.start
for frame in traj_reader.iterate():
# Skip if frame is not within range
if frame + base not in self.frames:
continue
# Calculate density of points for frame
for i, c in enumerate(self.centers):
selection_str = "({}) and (point {} {} {} {})".format(self.mol_name, *(tuple(c) + (self.radius,)))
traj_reader.parse_selection(selection_str)
H[i] += len(list(traj_reader.parse_selection(selection_str).residues().ids()))
# Indicate update of progress bar
self.pbar_queue.put(1)
return H
[docs]def sphere_density_raw(trajs, mol_name, centers, radius, pool, window=None, pbar=None):
"""
Calculate density of sphere with specified radius and with center in each point.
:param trajs: Collection of trajectory readers
:type trajs: class:`aquaduct.sandwich.ReaderTraj`
:param mol_name: Name of molecule which amount will be calculated around points.
:type mol_name: str
:param radius: Radius of selection for each point.
:type radius: float
:param centers: Points of path
:type centers: Iterator
:param pool: Preconfigured Pool instance
:type pool: Preconfigured Pool instance
:return: Density for each center.
:rtype: numpy.ndarray
"""
# Setup for queue and worker
pbar_queue = Manager().Queue()
worker = sphere_density_raw_worker(pbar_queue, mol_name, radius, centers, list(range(int(window[0]), int(window[1]))))
# Container for collected data from workers
D = []
r = pool.map_async(worker, trajs, callback=D.append)
# Incrementing progress bar using queued values
for p in iter(pbar_queue.get, None):
pbar.next(step=p)
if pbar.current == pbar.maxval:
break
r.wait()
# Return summed results from processes
return np.array(D[0]).sum(axis=0)
def hot_spots(H):
return hot_spots_his(H)
def hot_spots_his(H, bins=(5, 101)):
bn = []
for b in range(*bins):
his = np.histogram(H, bins=b)
if 0 in his[0]:
i = int(np.argwhere(his[0] == 0)[0])
bn.append((b, sum(his[0][i:])))
if len(bn):
i = np.argmax(np.array(bn)[:, 1])
b = bn[i][0]
his = np.histogram(H, bins=b)
hs = np.argwhere(his[0] == 0)
return his[1][hs][0]