# -*- coding: utf-8 -*-
# Aqua-Duct, a tool facilitating analysis of the flow of solvent molecules in molecular dynamic simulations
# Copyright (C) 2016-2018 Tomasz Magdziarz, Alicja Płuciennik, Michał Stolarczyk <info@aquaduct.pl>
# Copyright (C) 2019 Tomasz Magdziarz, Michał Banas <info@aquaduct.pl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import gc
from itertools import chain
import numpy as np
from aquaduct.apps.data import GCS
from aquaduct import logger
from setuptools import find_packages, setup
from aquaduct.traj.sandwich import open_traj_reader, ResidueSelection
from aquaduct.utils.helpers import create_tmpfile, iterate_or_die
#from aquaduct.apps.valve.core import GenericPaths
from aquaduct.traj.paths import GenericPaths
from aquaduct.apps.valve.helpers import results_n
[docs]class assign_nonsandwiched_paths(object):
"""
Worker which assign non-sandwiched paths to object container
"""
def __call__(self, args):
"""
:param args: tuple with object and paths
:return: aquaduct.traj.paths.GenericPaths
"""
pat, nfos = args
pat.add_012(nfos)
return pat
[docs]class assign_sandwiched_paths(object):
"""
Worker which assign sandwiched paths to object container
"""
def __init__(self, all_res_ids, all_res_names, max_pf, results):
"""
Constructor
:param all_res_ids: residues ids
:param all_res_names: residues names
:param max_pf: maximum possible frame
:param results: residue coords
:param pbar: progress bar
"""
self.all_res_ids = all_res_ids
self.all_res_names = all_res_names
self.max_pf = max_pf
self.results = results
def __call__(self, pnr):
"""
:param pnr: residue id
:return: aquaduct.traj.paths.GenericPaths
"""
new_p = GenericPaths((0, self.all_res_ids[pnr]),
name_of_res=self.all_res_names[pnr],
min_pf=0, max_pf=self.max_pf)
new_p.add_012(
np.fromiter(
chain(*(results_n(self.results[n])[:, pnr] for n in sorted(self.results.keys()))),
dtype=np.int8))
return new_p
def stage_I_worker_q(input_queue, results_queue, pbar_queue):
for input_data in iter(input_queue.get, None):
layer_number, traj_reader_proto, scope_everyframe, scope, scope_convexhull, scope_convexhull_inflate, object_selection, progress_freq = input_data
center_of_system = np.zeros(3)
all_res = None
traj_reader = open_traj_reader(traj_reader_proto)
if not scope_everyframe:
scope_selection = traj_reader.parse_selection(scope)
progress = 0
progress_freq_flex = min(1, progress_freq)
frame_rid_in_object = []
# the loop over frames
for frame in traj_reader.iterate_over_frames():
if scope_everyframe:
scope_selection = traj_reader.parse_selection(scope)
# center of system
center_of_system += scope_selection.center_of_mass()
# current res selection
res = traj_reader.parse_selection(object_selection).residues()
# find matching residues, ie those which are in the scope:
res_new = scope_selection.containing_residues(res, convex_hull=scope_convexhull,
convex_hull_inflate=scope_convexhull_inflate)
res_new.uniquify() # here is a list of residues in this layer that are in the object and in the scope
# TODO: change way of center_of_system calculation to reflect center of object?
# adds them to all_res
if all_res:
all_res.add(res_new)
all_res.uniquify()
else:
all_res = res_new
# remeber ids of res in object in current frame
if res_new is not None:
frame_rid_in_object.append([rid[-1] for rid in res_new.ids()])
else:
frame_rid_in_object.append([])
progress += 1
if progress == progress_freq_flex:
pbar_queue.put(progress)
progress = 0
progress_freq_flex = min(progress_freq_flex * 2, progress_freq)
# sent results
results_queue.put({layer_number: (all_res, frame_rid_in_object, center_of_system)})
pbar_queue.put(progress)
def stage_II_worker_q(input_queue, results_queue, pbar_queue):
# input queue loop
for input_data in iter(input_queue.get, None):
# get data
layer_number, traj_reader_proto, scope_everyframe, scope, scope_convexhull, scope_convexhull_inflate, object_selection, all_res_this_layer, frame_rid_in_object, is_number_frame_rid_in_object, progress_freq = input_data
# open trajectory and get number of frames
traj_reader = open_traj_reader(traj_reader_proto)
number_of_frames = traj_reader.number_of_frames()
# scope is evaluated only once before loop over frames so it cannot be frame dependent
if not scope_everyframe:
scope = traj_reader.parse_selection(scope)
logger.debug("Scope definition evaluated only once for given layer")
else:
logger.debug("Scope definition evaluated in every frame, this might be very slow.")
# get ids only once
all_res_this_ids = list(all_res_this_layer.ids())
# big container for 012 path data
# cache???
if GCS.cachedir:
number_frame_object_scope = np.memmap(create_tmpfile(ext='dat', dir=GCS.cachedir),
dtype=np.int8,
shape=(number_of_frames, all_res_this_layer.len()))
else:
number_frame_object_scope = np.zeros((number_of_frames, all_res_this_layer.len()),
dtype=np.int8)
# progress reported peridicaly
progress = 0
progress_gc = 0
progress_freq_flex = min(1, progress_freq)
# the loop over frames, use izip otherwise iteration over frames does not work
for rid_in_object, frame in zip(iterate_or_die(frame_rid_in_object, times=number_of_frames),
traj_reader.iterate_over_frames()):
# do we have object data?
if not is_number_frame_rid_in_object:
rid_in_object = [rid[-1] for rid in traj_reader.parse_selection(object_selection).residues().ids()]
# assert rid_in_object is not None
is_res_in_object = (rid[-1] in rid_in_object for rid in all_res_this_ids)
# should scope be evaluated?
if scope_everyframe:
scope = traj_reader.parse_selection(scope)
# check if all_res are in the scope, reuse res_ids_in_object_over_frames
is_res_in_scope = scope.contains_residues(all_res_this_layer, convex_hull=scope_convexhull,
convex_hull_inflate=scope_convexhull_inflate,
known_true=None) # known_true could be rid_in_object
# store results in the container
number_frame_object_scope[frame, :] = np.array(list(map(sum, zip(is_res_in_object, is_res_in_scope))),
dtype=np.int8)
# increase progress counter and report progress if needed
progress += 1
progress_gc += 1
if progress == progress_freq_flex:
pbar_queue.put(progress)
progress = 0
progress_freq_flex = min(progress_freq_flex * 2, progress_freq)
if GCS.cachedir:
number_frame_object_scope.flush()
if progress_gc == progress_freq * 100:
gc.collect()
progress_gc = 0
# sent results to results_queue
# cache???
if GCS.cachedir:
number_frame_object_scope.flush()
results_queue.put({layer_number: (number_frame_object_scope.filename, number_frame_object_scope.shape)})
del number_frame_object_scope
else:
results_queue.put({layer_number: number_frame_object_scope})
if progress:
pbar_queue.put(progress)
# termination
gc.collect()
def stage_II_worker_q_twoways(input_queue, results_queue, pbar_queue):
# input queue loop
for input_data in iter(input_queue.get, None):
# get data
layer_number, traj_reader_proto, scope_everyframe, scope, scope_convexhull, scope_convexhull_inflate, object_selection, all_res_this_layer, frame_rid_in_object, is_number_frame_rid_in_object, progress_freq = input_data
# open trajectory and get number of frames
traj_reader = open_traj_reader(traj_reader_proto)
number_of_frames = traj_reader.number_of_frames()
# scope is evaluated only once before loop over frames so it cannot be frame dependent
if not scope_everyframe:
scope = traj_reader.parse_selection(scope)
logger.debug("Scope definition evaluated only once for given layer")
else:
logger.debug("Scope definition evaluated in every frame, this might be very slow.")
# get ids only once
all_res_this_ids = list(all_res_this_layer.ids())
# big container for 012 path data
# cache???
if GCS.cachedir:
number_frame_object_scope = np.memmap(create_tmpfile(ext='dat', dir=GCS.cachedir),
dtype=np.int8,
shape=(number_of_frames, all_res_this_layer.len()))
else:
number_frame_object_scope = np.zeros((number_of_frames, all_res_this_layer.len()),
dtype=np.int8)
# the loop over frames, use izip otherwise iteration over frames does not work
progress = 0
for reverse in (False, True):
# progress reported peridicaly
progress_gc = 0
progress_freq_flex = min(1, progress_freq)
# which all_res should be evalated?
all_res_eval = np.ones(len(all_res_this_ids), dtype=bool)
for rid_in_object, frame in zip(
iterate_or_die(frame_rid_in_object, times=number_of_frames, reverse=reverse),
traj_reader.iterate_over_frames(reverse=reverse)):
# do we have object data?
if not is_number_frame_rid_in_object:
rid_in_object = [rid[-1] for rid in traj_reader.parse_selection(object_selection).residues().ids()]
# assert rid_in_object is not None
is_res_in_object = np.fromiter((rid[-1] in rid_in_object for rid in all_res_this_ids), dtype=bool)
# if in object do not do scope check
all_res_eval[is_res_in_object] = False
if reverse:
all_res_eval[number_frame_object_scope[frame, :] > 0] = False
all_res_this_ids_eval = (i[-1] for te, i in zip(all_res_eval, all_res_this_ids) if te)
all_res_this_layer_eval = ResidueSelection(
{all_res_this_layer.numbers()[0]: list(all_res_this_ids_eval)})
# should scope be evaluated?
if scope_everyframe:
scope = traj_reader.parse_selection(scope)
# check if all_res are in the scope, reuse res_ids_in_object_over_frames
is_res_in_scope_eval = scope.contains_residues(all_res_this_layer_eval, convex_hull=scope_convexhull,
convex_hull_inflate=scope_convexhull_inflate,
known_true=None) # known_true could be rid_in_object
is_res_in_scope = np.zeros(len(all_res_this_ids), dtype=bool)
is_res_in_scope[
[int(nr) for nr, iris in zip(np.argwhere(all_res_eval), is_res_in_scope_eval) if iris]] = True
if not reverse:
is_res_in_scope[is_res_in_object] = True
# store results in the container
if reverse:
number_frame_object_scope[frame, is_res_in_scope] = 1
is_res_in_scope[is_res_in_object] = True
else:
number_frame_object_scope[frame, :] = np.array(list(map(sum, zip(is_res_in_object, is_res_in_scope))),
dtype=np.int8)
# increase progress counter and report progress if needed
progress += 1
progress_gc += 1
if progress == progress_freq_flex:
pbar_queue.put(progress * 0.5)
progress = 0
progress_freq_flex = min(progress_freq_flex * 2, progress_freq)
if GCS.cachedir:
number_frame_object_scope.flush()
if progress_gc > progress_freq * 100:
gc.collect()
progress_gc = 0
all_res_eval = np.zeros(len(all_res_this_ids), dtype=bool)
all_res_eval[is_res_in_scope] = True
# sent results to results_queue
# cache???
if GCS.cachedir:
number_frame_object_scope.flush()
results_queue.put({layer_number: (number_frame_object_scope.filename, number_frame_object_scope.shape)})
del number_frame_object_scope
else:
results_queue.put({layer_number: number_frame_object_scope})
gc.collect()
pbar_queue.put(progress * 0.5)
# termination