Source code for aquaduct.geom.cluster

# -*- coding: utf-8 -*-

# Aqua-Duct, a tool facilitating analysis of the flow of solvent molecules in molecular dynamic simulations
# Copyright (C) 2016-2018  Tomasz Magdziarz, Alicja Płuciennik, Michał Stolarczyk <info@aquaduct.pl>
# Copyright (C) 2019  Tomasz Magdziarz <info@aquaduct.pl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
This module provides functions for clustering.
Clustering is done by :mod:`scikit-learn` module.
"""


import numpy as np
from sklearn.cluster import KMeans, MeanShift, estimate_bandwidth
from aquaduct.utils.helpers import Auto
from aquaduct.utils import clui
from aquaduct.traj.barber import WhereToCut
from aquaduct.geom import Sphere


# problems with clustering methods and size of set
# DBSCAN:              n > 0
# AffinityPropagation: n > 0
# KMeans:              n > clusters
# MeanShift:           n > 6


[docs]class BarberClusterResult(object): """ Helper class for results of barber clustering. """ def __init__(self, labels_): self.labels_ = np.array(labels_)
[docs]class BarberCluster(object): ''' Wrapper class that implements *barber* clustering. '''
[docs] @staticmethod def fit(coords, spheres=None, radii=None): ''' :param Iterable coords: Input coordinates of points to be clustered. :param Iterable spheres: Input spheres for each point. Each sphere has center and radius. :param Iterable radii: Radii to create spheres using coords. ''' wtc = WhereToCut() if spheres is not None: wtc.spheres = spheres elif radii is not None: spheres = [Sphere(center=center, radius=radius, nr=nr) for nr, (center, radius) in enumerate(zip(coords, radii))] wtc.spheres = spheres else: raise TypeError('Either spheres or radii have to be specified.') clouds = wtc.cloud_groups(progress=True) # clouds to labels! labels = np.zeros(len(spheres)) spheres_id = [s.nr for s in spheres] for cloud_id, cloud in clouds.items(): labels[[spheres_id.index(c) for c in cloud]] = cloud_id return BarberClusterResult(labels)
[docs]def MeanShiftBandwidth(X, **kwargs): ''' Helper function for automatic calculation of a bandwidth for MeanShift method. :param Iterable X: Coordinates of points to be clustered. ''' if 'bandwidth' in kwargs: if kwargs['bandwidth'] is Auto: bandwidth = estimate_bandwidth(np.array(X), quantile=0.5) # TODO: change it to the default value of 0.3 or use it as option? if not bandwidth: bandwidth = None clui.message("Meanshift automatic bandwidth calculation returned 0; setting bandwidth to None.") else: clui.message("Meanshift automatic bandwidth calculation: bandwidth = %f" % float( bandwidth)) kwargs.update({'bandwidth': bandwidth}) return kwargs
[docs]class PerformClustering(object): ''' Helper class for clustering. ''' # aqeuduct clustering helper class def __init__(self, method, **kwargs): ''' :param object method: Class that implements cclustering via *fit* method. ''' self.method = method self.method_kwargs = kwargs self.method_results = None self.clusters = None def __str__(self): out = str(self.method.__name__) return out def __call__(self, coords, spheres=None): # compatibility return self.fit(coords, spheres=spheres) def _get_noclusters(self, n): return [0] * n def _get_oneclusters(self, n): return [1] * n
[docs] def fit(self, coords, spheres=None): ''' :param Iterable coords: Input coordinates of points to be clustered. :param Iterable spheres: Input spheres for each point. Optional, important only if :attr:`method` is :class:`BarberCluster`. :return: Clusters numbers. :rtype: list of int ''' # spheres are used for Barber only if len(coords) < 2: # single point forms one cluster self.clusters = self._get_oneclusters(len(coords)) return self.clusters # special cases if self.method is BarberCluster: method = self.method() self.method_results = method.fit(coords, spheres) self.clusters = list(map(int, self.method_results.labels_ + 1)) else: if self.method is MeanShift: if len(coords) < 6: self.clusters = self._get_noclusters(len(coords)) clui.message( "Number of objects %d < 6 is too small for MeanShift method. Skipping." % (len(coords))) return self.clusters method = self.method(**MeanShiftBandwidth(coords, **self.method_kwargs)) else: if self.method is KMeans: if 'n_clusters' in self.method_kwargs: if len(coords) < self.method_kwargs['n_clusters']: self.clusters = self._get_noclusters(len(coords)) clui.message( "Number of objects %d < %d is too small for KMeans method. Skipping." % ( len(coords), self.method_kwargs['n_clusters'])) return self.clusters method = self.method(**self.method_kwargs) self.method_results = method.fit(coords) self.clusters = list(map(int, self.method_results.labels_ + 1)) return self.clusters
[docs] def centers(self): ''' :return: Centers of clusters. ''' assert self.clusters is not None, "Perform clustering first." assert self.method_results is not None, "Perform clustering first." if hasattr(self.method_results, 'cluster_centers_'): return self.method_results.cluster_centers_ raise NotImplementedError('Cluster centers is not implemented for %r method yet.' % self.method)