Source code for chemistry_tools.spectrum_similarity

#!/usr/bin/env python3
#
#  spectrum_similarity.py
"""
Mass spectrum similarity calculations.
"""
#
#  Copyright (c) 2019-2020 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
#
#  Adapted from SpectrumSimilarity.R
#  Part of OrgMassSpecR
#  |  Copyright (c) 2011-2017 Nathan Dodder <nathand@sccwrp.org>
#  |  Available under the BSD 2-Clause License
#  |
#  |  Redistribution and use in source and binary forms, with or without modification,
#  |  are permitted provided that the following conditions are met:
#  |
#  |    Redistributions of source code must retain the above copyright notice, this
#  |    list of conditions and the following disclaimer.
#  |
#  |    Redistributions in binary form must reproduce the above copyright notice, this
#  |    list of conditions and the following disclaimer in the documentation and/or
#  |    other materials provided with the distribution.
#  |
#  |  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
#  |  ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
#  |  WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
#  |  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
#  |  ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
#  |  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
#  |  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
#  |  ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
#  |  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
#  |  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#

# stdlib
from typing import Mapping, Optional, Sequence, Tuple, Union

# 3rd party
import numpy
import pandas  # type: ignore[import]

__all__ = ["spectrum_similarity", "normalize", "create_array", "SpectrumSimilarity"]


[docs]class SpectrumSimilarity: """ Calculate the similarity score for two mass spectra. :param spec_top: Array containing the experimental spectrum's peak list with the *m/z* values in the first column and corresponding intensities in the second :param spec_bottom: Array containing the reference spectrum's peak list with the *m/z* values in the first column and corresponding intensities in the second :param b: numeric value specifying the baseline threshold for peak identification. Expressed as a percent of the maximum intensity. :param xlim: tuple of length 2, defining the beginning and ending values of the x-axis. .. versionadded:: 1.0.0 .. TODO: x_threshold: numeric value specifying t: numeric value specifying the tolerance used to align the *m/z* values of the two spectra. """ top_df: pandas.DataFrame _top_df_plot: pandas.DataFrame # includes peaks below ``b`` bottom_df: pandas.DataFrame _bottom_df_plot: pandas.DataFrame # includes peaks below ``b`` b: float alignment: pandas.DataFrame def __init__( self, spec_top: numpy.ndarray, spec_bottom: numpy.ndarray, # t: float = 0.25, b: float = 1, xlim: Tuple[int, int] = (50, 1200), # x_threshold: float = 0, ): # if x_threshold < 0: # raise ValueError("x_threshold argument must be zero or a positive number") self.b = b self.xlim = xlim # format spectra and normalize intensitites self.top_df, self._top_df_plot = self._build_dataframe(spec_top) self.bottom_df, self._bottom_df_plot = self._build_dataframe(spec_bottom) # align the m/z axis of the two spectra, the bottom spectrum is used as the reference # Unimplemented R code # for(i in 1:nrow(bottom)) # top["mz"][bottom["mz"][i] >= top["mz"] - t & bottom["mz"][i] <= top["mz"] + t] = bottom["mz"][i] # top[,1][bottom[,1][i] >= top[,1] - t & bottom[,1][i] <= top[,1] + t] <- bottom[,1][i] # alignment <- merge(top, bottom, by = 1, all = TRUE) # if(length(unique(alignment[,1])) != length(alignment[,1])) warning("the m/z tolerance is set too high") # alignment[,c(2,3)][is.na(alignment[,c(2,3)])] <- 0 # convert NAs to zero (R-Help, Sept. 15, 2004, John Fox) # names(alignment) <- c("mz", "intensity.top", "intensity.bottom") alignment = pandas.merge(self.top_df, self.bottom_df, on="mz", how="outer") self.alignment = alignment.fillna(value=0) # Convert NaN to 0 self.alignment.columns = ["mz", "intensity_top", "intensity_bottom"] reverse_alignment = pandas.merge(self.top_df, self.bottom_df, on="mz", how="right") self.reverse_alignment = reverse_alignment.dropna() # Remove rows containing NaN self.reverse_alignment.columns = ["mz", "intensity_top", "intensity_bottom"] def _calculate_score(self, alignment: pandas.DataFrame) -> float: u = numpy.array(alignment.iloc[:, 1]) v = numpy.array(alignment.iloc[:, 2]) return numpy.dot(u, v) / (numpy.sqrt(numpy.sum(numpy.square(u))) * numpy.sqrt(numpy.sum(numpy.square(v))))
[docs] def score(self) -> Tuple[float, float]: """ Returns the similarity score. """ # Unimplemented R code # alignment <- alignment[alignment[,1] >= x.threshold, ] similarity_score = self._calculate_score(self.alignment) if self.reverse_alignment.empty: reverse_similarity_score = 0.0 else: reverse_similarity_score = self._calculate_score(self.reverse_alignment) return similarity_score, reverse_similarity_score
[docs] def plot( self, top_label: Optional[str] = None, bottom_label: Optional[str] = None, filter: bool = False, # Whether peaks below b should be filtered # noqa: A002 # pylint: disable=redefined-builtin ) -> None: """ Plot the mass spectra head to tail. :param top_label: string to label the top spectrum. :param bottom_label: string to label the bottom spectrum. """ # 3rd party import matplotlib.pyplot as plt # type: ignore[import] # nodep _, ax = plt.subplots() # fig.scatter(top_plot["mz"],top_plot["intensity"], s=0) if filter: ax.vlines(self.top_df["mz"], 0, self.top_df["intensity"], color="blue") ax.vlines(self.bottom_df["mz"], 0, -self.bottom_df["intensity"], color="red") else: ax.vlines(self._top_df_plot["mz"], 0, self._top_df_plot["intensity"], color="blue") ax.vlines(self._bottom_df_plot["mz"], 0, -self._bottom_df_plot["intensity"], color="red") ax.set_ylim(-125, 125) ax.set_xlim(self.xlim[0], self.xlim[1]) ax.axhline(color="black", linewidth=0.5) ax.set_ylabel("Intensity (%)") ax.set_xlabel("m/z", style="italic", family="serif") h_centre = self.xlim[0] + (self.xlim[1] - self.xlim[0]) // 2 ax.text(h_centre, 110, top_label, horizontalalignment="center", verticalalignment="center") ax.text(h_centre, -110, bottom_label, horizontalalignment="center", verticalalignment="center") return ax
# Unimplemented R code # ticks <- c(-100, -50, 0, 50, 100) # plot.window(xlim = c(0, 20), ylim = c(-10, 10)) # # # if(output.list == TRUE) { # # # Grid graphics head to tail plot # # headTailPlot <- function() { # # pushViewport(plotViewport(c(5, 5, 2, 2))) # pushViewport(dataViewport(xscale = xlim, yscale = c(-125, 125))) # # grid.rect() # tmp <- pretty(xlim) # xlabels <- tmp[tmp >= xlim[1] & tmp <= xlim[2]] # grid.xaxis(at = xlabels) # grid.yaxis(at = c(-100, -50, 0, 50, 100)) # # grid.segments(top_plot$mz, # top_plot$intensity, # top_plot$mz, # rep(0, length(top_plot$intensity)), # default.units = "native", # gp = gpar(lwd = 0.75, col = "blue")) # # grid.segments(bottom_plot$mz, # -bottom_plot$intensity, # bottom_plot$mz, # rep(0, length(bottom_plot$intensity)), # default.units = "native", # gp = gpar(lwd = 0.75, col = "red")) # # grid.abline(intercept = 0, slope = 0) # # grid.text("intensity (%)", x = unit(-3.5, "lines"), rot = 90) # grid.text("m/z", y = unit(-3.5, "lines")) # # popViewport(1) # pushViewport(dataViewport(xscale = c(0, 20), yscale = c(-10, 10))) # grid.text(top.label, unit(10, "native"), unit(9, "native")) # grid.text(bottom.label, unit(10, "native"), unit(-9, "native")) # # popViewport(2) # # } # # p <- grid.grabExpr(headTailPlot()) # # } # # with pandas.option_context('display.max_rows', None, 'display.max_columns', None): # print(similarity_score) # Unimplemented R code # # return(list(similarity.score = similarity_score, # alignment = alignment, # plot = p)) #
[docs] def print_alignment(self) -> None: """ Print the dataframe giving aligned peaks in the top and bottom spectra. """ with pandas.option_context("display.max_rows", None, "display.max_columns", None): print(self.alignment)
def _build_dataframe(self, spectrum: numpy.ndarray) -> Tuple[pandas.DataFrame, pandas.DataFrame]: # format spectra and normalize intensitites tmp = pandas.DataFrame(data=spectrum, columns=["mz", "intensity"]) tmp["normalized"] = tmp.apply(normalize, args=(max(tmp["intensity"]), ), axis=1) tmp = tmp[tmp["mz"].between(self.xlim[0], self.xlim[1])] plot = tmp[["mz", "normalized"]].copy() # data frame for plotting spectrum plot.columns = ["mz", "intensity"] out = plot[plot["intensity"] >= self.b] # data frame for similarity score calculation return out, plot
# Deprecated
[docs]def spectrum_similarity( spec_top: numpy.ndarray, spec_bottom: numpy.ndarray, t: float = 0.25, b: float = 10, top_label: Optional[str] = None, bottom_label: Optional[str] = None, xlim: Tuple[int, int] = (50, 1200), x_threshold: float = 0, print_alignment: bool = False, print_graphic: bool = True, output_list: bool = False, ) -> Union[Tuple[float, float], Tuple[float, float, pandas.DataFrame]]: """ Calculate the similarity score for two mass spectra. .. attention:: The :class:`~.SpectrumSimilarity` class is recommended over this function. :param spec_top: Array containing the experimental spectrum's peak list with the *m/z* values in the first column and corresponding intensities in the second :param spec_bottom: Array containing the reference spectrum's peak list with the *m/z* values in the first column and corresponding intensities in the second :param t: numeric value specifying the tolerance used to align the *m/z* values of the two spectra. :param b: numeric value specifying the baseline threshold for peak identification. Expressed as a percent of the maximum intensity. :param top_label: string to label the top spectrum. :param bottom_label: string to label the bottom spectrum. :param xlim: tuple of length 2, defining the beginning and ending values of the x-axis. :param x_threshold: :param print_alignment: whether the intensities should be printed :param print_graphic: :param output_list: whether the intensities should be returned as a third element of the tuple. """ # format spectra and normalize intensitites top_tmp = pandas.DataFrame(data=spec_top, columns=["mz", "intensity"]) top_tmp["normalized"] = top_tmp.apply(normalize, args=(max(top_tmp["intensity"]), ), axis=1) top_tmp = top_tmp[top_tmp["mz"].between(xlim[0], xlim[1])] top_plot = top_tmp[["mz", "normalized"]].copy() # data frame for plotting spectrum top_plot.columns = ["mz", "intensity"] top = top_plot[top_plot["intensity"] >= b] # data frame for similarity score calculation bottom_tmp = pandas.DataFrame(data=spec_bottom, columns=["mz", "intensity"]) bottom_tmp["normalized"] = bottom_tmp.apply(normalize, args=(max(bottom_tmp["intensity"]), ), axis=1) bottom_tmp = bottom_tmp[bottom_tmp["mz"].between(xlim[0], xlim[1])] bottom_plot = bottom_tmp[["mz", "normalized"]].copy() # data frame for plotting spectrum bottom_plot.columns = ["mz", "intensity"] bottom = bottom_plot[bottom_plot["intensity"] >= b] # data frame for similarity score calculation # align the m/z axis of the two spectra, the bottom spectrum is used as the reference # Unimplemented R code # for(i in 1:nrow(bottom)) # top["mz"][bottom["mz"][i] >= top["mz"] - t & bottom["mz"][i] <= top["mz"] + t] = bottom["mz"][i] # top[,1][bottom[,1][i] >= top[,1] - t & bottom[,1][i] <= top[,1] + t] <- bottom[,1][i] # alignment <- merge(top, bottom, by = 1, all = TRUE) # if(length(unique(alignment[,1])) != length(alignment[,1])) warning("the m/z tolerance is set too high") # alignment[,c(2,3)][is.na(alignment[,c(2,3)])] <- 0 # convert NAs to zero (R-Help, Sept. 15, 2004, John Fox) # names(alignment) <- c("mz", "intensity.top", "intensity.bottom") # alignment = pandas.merge(top, bottom, on="mz", how="outer") alignment.fillna(value=0, inplace=True) # Convert NaN to 0 alignment.columns = ["mz", "intensity_top", "intensity_bottom"] if print_alignment: with pandas.option_context("display.max_rows", None, "display.max_columns", None): print(alignment) # similarity score calculation if x_threshold < 0: raise ValueError("x_threshold argument must be zero or a positive number") # Unimplemented R code # alignment <- alignment[alignment[,1] >= x.threshold, ] u = numpy.array(alignment.iloc[:, 1]) v = numpy.array(alignment.iloc[:, 2]) similarity_score = numpy.dot(u, v) / ( numpy.sqrt(numpy.sum(numpy.square(u))) * numpy.sqrt(numpy.sum(numpy.square(v))) ) # Reverse Match reverse_alignment = pandas.merge(top, bottom, on="mz", how="right") reverse_alignment = reverse_alignment.dropna() # Remove rows containing NaN reverse_alignment.columns = ["mz", "intensity_top", "intensity_bottom"] u = numpy.array(reverse_alignment.iloc[:, 1]) v = numpy.array(reverse_alignment.iloc[:, 2]) reverse_similarity_score = numpy.dot(u, v) / ( numpy.sqrt(numpy.sum(numpy.square(u))) * numpy.sqrt(numpy.sum(numpy.square(v))) ) # generate plot if print_graphic: # 3rd party import matplotlib.pyplot as plt # nodep fig, ax = plt.subplots() # fig.scatter(top_plot["mz"],top_plot["intensity"], s=0) ax.vlines(top_plot["mz"], 0, top_plot["intensity"], color="blue") ax.vlines(bottom["mz"], 0, -bottom["intensity"], color="red") ax.set_ylim(-125, 125) ax.set_xlim(xlim[0], xlim[1]) ax.axhline(color="black", linewidth=0.5) ax.set_ylabel("Intensity (%)") ax.set_xlabel("m/z", style="italic", family="serif") h_centre = xlim[0] + (xlim[1] - xlim[0]) // 2 ax.text(h_centre, 110, top_label, horizontalalignment="center", verticalalignment="center") ax.text(h_centre, -110, bottom_label, horizontalalignment="center", verticalalignment="center") plt.show() # Unimplemented R code # ticks <- c(-100, -50, 0, 50, 100) # plot.window(xlim = c(0, 20), ylim = c(-10, 10)) # # # if(output.list == TRUE) { # # # Grid graphics head to tail plot # # headTailPlot <- function() { # # pushViewport(plotViewport(c(5, 5, 2, 2))) # pushViewport(dataViewport(xscale = xlim, yscale = c(-125, 125))) # # grid.rect() # tmp <- pretty(xlim) # xlabels <- tmp[tmp >= xlim[1] & tmp <= xlim[2]] # grid.xaxis(at = xlabels) # grid.yaxis(at = c(-100, -50, 0, 50, 100)) # # grid.segments(top_plot$mz, # top_plot$intensity, # top_plot$mz, # rep(0, length(top_plot$intensity)), # default.units = "native", # gp = gpar(lwd = 0.75, col = "blue")) # # grid.segments(bottom_plot$mz, # -bottom_plot$intensity, # bottom_plot$mz, # rep(0, length(bottom_plot$intensity)), # default.units = "native", # gp = gpar(lwd = 0.75, col = "red")) # # grid.abline(intercept = 0, slope = 0) # # grid.text("intensity (%)", x = unit(-3.5, "lines"), rot = 90) # grid.text("m/z", y = unit(-3.5, "lines")) # # popViewport(1) # pushViewport(dataViewport(xscale = c(0, 20), yscale = c(-10, 10))) # grid.text(top.label, unit(10, "native"), unit(9, "native")) # grid.text(bottom.label, unit(10, "native"), unit(-9, "native")) # # popViewport(2) # # } # # p <- grid.grabExpr(headTailPlot()) # # } # # with pandas.option_context('display.max_rows', None, 'display.max_columns', None): # print(similarity_score) if output_list: return similarity_score, reverse_similarity_score, alignment # Unimplemented R code # # return(list(similarity.score = similarity_score, # alignment = alignment, # plot = p)) # else: return similarity_score, reverse_similarity_score
# simscore <- as.vector((u %*% v)^2 / (sum(u^2) * sum(v^2))) # cos squared # SpectrumSimilarity = spectrum_similarity
[docs]def normalize(row: Union[Mapping, pandas.Series], max_val: Union[float, str]) -> float: """ Returns the normalised intensity for each rows of a :class:`pandas.DataFrame`. :param row: :param max_val: """ # http://jonathansoma.com/lede/foundations/classes/pandas%20columns%20and%20functions/apply-a-function-to-every-row-in-a-pandas-dataframe/ return (row["intensity"] / float(max_val)) * 100.0
[docs]def create_array(intensities: Sequence[float], mz: Sequence[float]) -> numpy.ndarray: """ Create a :class:`numpy.ndarray`, in a format appropriate for :class:`~.SpectrumSimilarity`, from a list of intensities and a list of *m/z* values. :param intensities: List of intensities :param mz: List of *m/z* values. """ # noqa: D400 return numpy.column_stack((mz, intensities))