import pandas as pd
import numpy as np
from scipy.stats import chi2
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import (
silhouette_score,
adjusted_rand_score,
normalized_mutual_info_score,
)
from sklearn.cluster import KMeans
from scipy.spatial.distance import pdist, squareform
from skbio.stats.distance import DistanceMatrix, permanova
from abaco.dataloader import DataTransform
[docs]
def kBET(data, batch_label="batch"):
"""
Compute the k-nearest neighbor batch effect test (kBET).
Parameters
----------
data : pandas.DataFrame
Input data containing OTU counts and metadata.
batch_label : str, optional
Column name for batch identifiers, by default 'batch'.
Returns
-------
float
Proportion of samples with p-value > 0.05 (null hypothesis not rejected).
"""
data_otus = data.select_dtypes(include="number")
data_batch = data[batch_label]
n = len(data_batch)
k = int(np.sqrt(data_otus.shape[0])) # k-nearest neighbors
dof = len(data_batch.unique()) - 1 # Degrees of freedom
knn = NearestNeighbors(n_neighbors=k, metric="euclidean")
knn.fit(data_otus)
_, indx = knn.kneighbors(data_otus)
p_values = []
for _j, neighbor in enumerate(indx):
gamma_j = []
for i in data_batch.unique():
mu_ij = (
round(sum(data_batch == i) / n * k + 1e-6) + 1e-6
) # Expected number of samples from batch i in neighbor j
n_ij = 0 # Actual number of samples from batch i in neighbor j
for s in neighbor:
if data_batch.iloc[s] == i:
n_ij += 1 # Identifies batch label on neighborhood
gamma_ij = (n_ij - mu_ij) ** 2 / mu_ij
gamma_j.append(gamma_ij)
sum_gamma_j = sum(gamma_j) # Follows Chi-squared distribution
p_j = 1 - chi2.cdf(sum_gamma_j, dof) # Pearson Chi-squared test
p_values.append(p_j)
return sum(1 for p in p_values if p > 0.05) / n
[docs]
def ASW(data, interest_label="tissue"):
"""
Compute the average silhouette width (ASW) for a given label.
Parameters
----------
data : pandas.DataFrame
Input data containing OTU counts and metadata.
interest_label : str, optional
Column name for the label of interest, by default 'tissue'.
Returns
-------
float
Average silhouette score.
"""
average_silhouette = silhouette_score(
data.select_dtypes(include="number"), data[interest_label]
)
return average_silhouette
[docs]
def ARI(data, interest_label="tissue", n_clusters=None):
"""
Compute the Adjusted Rand Index (ARI) between true labels and KMeans clusters.
Parameters
----------
data : pandas.DataFrame
Input data containing OTU counts and metadata.
interest_label : str, optional
Column name for the label of interest, by default 'tissue'.
n_clusters : int, optional
Number of clusters for KMeans. If None, uses the number of unique labels.
Returns
-------
float
Adjusted Rand Index score.
"""
data_otus = data.select_dtypes(include="number") # OTUs
data_bio = data[interest_label] # Labels
if n_clusters is None:
kmeans = KMeans(
n_clusters=len(set(data_bio)), random_state=42
) # KMeans clustering
else:
kmeans = KMeans(
n_clusters=n_clusters, random_state=42
) # KMeans clustering w/ n clusters
predicted_clusters = kmeans.fit_predict(data_otus) # Predicting label of cluster
ari = adjusted_rand_score(data_bio, predicted_clusters) # ARI
return ari
[docs]
def NMI(data, bio_label, n_cluster=None, average_method="arithmetic"):
"""
Compute the Normalized Mutual Information (NMI) between true labels and KMeans clusters.
Parameters
----------
data : pandas.DataFrame
Input data containing OTU counts and metadata.
bio_label : str
Column name for biological labels.
n_cluster : int, optional
Number of clusters for KMeans. If None, uses the number of unique labels.
average_method : str, optional
Method for averaging NMI, by default 'arithmetic'.
Returns
-------
float
Normalized Mutual Information score.
"""
taxa_matrix = data.select_dtypes(include="number")
true_labels = data[bio_label]
# Decide number of clusters
k = n_cluster if n_cluster is not None else len(true_labels.unique())
kmeans = KMeans(n_clusters=k, random_state=42)
pred_labels = kmeans.fit_predict(taxa_matrix)
# Compute NMI
nmi_score = normalized_mutual_info_score(
true_labels, pred_labels, average_method=average_method
)
return nmi_score
[docs]
def all_metrics(data, bio_label, batch_label, n_cluster=None):
"""
Compute all batch correction and biological conservation metrics.
Batch correction:
kBET, ARI (batch), ASW (batch)
Biological conservation:
NMI, ARI (bio), ASW (bio)
Parameters
----------
data : pandas.DataFrame
Input data containing OTU counts and metadata.
bio_label : str
Column name for biological labels.
batch_label : str
Column name for batch identifiers.
n_cluster : int, optional
Number of clusters for KMeans. If None, uses the number of unique labels.
Returns
-------
tuple of dict
(batch_metrics, bio_metrics)
batch_metrics: dict with keys 'kBET', 'ARI', 'ASW'
bio_metrics: dict with keys 'NMI', 'ARI', 'ASW'
"""
# Batch correction metrics
kbet = kBET(data, batch_label=batch_label)
ari_batch = 1 - ARI(data, interest_label=batch_label, n_clusters=n_cluster)
asw_batch = 1 - ASW(data, interest_label=batch_label)
# Biological conservation metrics
nmi = NMI(data, bio_label=bio_label)
ari_bio = ARI(data, interest_label=bio_label, n_clusters=n_cluster)
asw_bio = ASW(data, interest_label=bio_label)
# Return dictionary with all values
batch_metrics = {"kBET": kbet, "ARI": ari_batch, "ASW": asw_batch}
bio_metrics = {"NMI": nmi, "ARI": ari_bio, "ASW": asw_bio}
return batch_metrics, bio_metrics
# New reviewd functions
# cLISI = iLISI but the labels are just interchanged
[docs]
def cLISI_raw(
data: pd.DataFrame,
bio_label: str,
k: int = None,
):
"""
Compute non-normalized cell-type LISI (cLISI).
Parameters
----------
data : pandas.DataFrame
DataFrame containing normalized numeric type taxonomic groups and categorical columns.
bio_label : str
Name of the column with the categorical bio-type labels.
k : int, optional
Neighborhood size. By default, uses the square root of the number of samples.
Returns
-------
float
Mean raw cLISI across all samples (range: 1 to number of unique bio-type labels).
"""
# Extract numerico matrix and labels
data_otus = data.select_dtypes(include="number").values
data_bio = data[bio_label].values
n_samples = data_otus.shape[0]
# Determine k
if k is None:
k = max(1, int(np.sqrt(n_samples)))
# Build kNN graph in Euclidean space
knn = NearestNeighbors(n_neighbors=k, metric="euclidean")
knn.fit(data_otus)
_, neighbors = knn.kneighbors(data_otus, return_distance=True)
# Compute per-biological type inverse Simpson's index
isi_list = []
for idx in neighbors:
# Count frequency of each label in the neighborhood
counts = pd.Series(data_bio[idx]).value_counts().values
# Get probability of label in the neighborhood
probs = counts / counts.sum()
# Simpson's diversity: sum of squared probabilities
si = np.sum(probs**2)
# Inverse simpson's index
isi = 1.0 / si
isi_list.append(isi)
# Return the average raw cLISI
cLISI = np.mean(isi_list)
return cLISI
[docs]
def cLISI_norm(
data: pd.DataFrame,
bio_label: str,
k: int = None,
n_bio=None,
):
"""
Compute normalized cell-type LISI (cLISI) in [0, 1].
Parameters
----------
data : pandas.DataFrame
DataFrame containing normalized numeric type taxonomic groups and categorical columns.
bio_label : str
Name of the column with the categorical bio-type labels.
k : int, optional
Neighborhood size. By default, uses the square root of the number of samples.
n_bio : int, optional
Number of biological labels. If not provided, inferred from data.
Returns
-------
float
Mean normalized cLISI across all samples (range: 0 to 1).
"""
# Compute raw cLISI score
raw = cLISI_raw(data=data, bio_label=bio_label, k=k)
# Number of distinct bio-type labels
if n_bio is None:
n_bio = data[bio_label].nunique()
# In case there is only 1 biological group
if n_bio < 2:
return 1.0
# Normalized rank from 0 to 1
clisi_norm = (n_bio - raw) / (n_bio - 1)
return clisi_norm
[docs]
def cLISI_full_rank(
data: pd.DataFrame,
bio_label: str,
n_bio=None,
perplexities: list = None,
):
"""
Compute normalized cLISI for a range of perplexities.
Parameters
----------
data : pandas.DataFrame
DataFrame containing normalized numeric type taxonomic groups and categorical columns.
bio_label : str
Name of the column with the categorical bio-type labels.
n_bio : int, optional
Number of biological labels. If not provided, inferred from data.
perplexities : list of int, optional
List of neighborhood sizes (k) to use. If None, uses all possible k.
Returns
-------
pandas.DataFrame
DataFrame with columns ['perplexity', 'cLISI'] for each k.
"""
# Define perplexities to be used
if perplexities is None:
perplexities = [i + 1 for i in range(data.shape[0])]
clisis = []
for k in perplexities:
clisi = cLISI_norm(data=data, bio_label=bio_label, k=k)
clisis.append({"perplexity": k, "cLISI": clisi})
clisi_table = pd.DataFrame(clisis)
return clisi_table
[docs]
def iLISI_raw(
data: pd.DataFrame,
batch_label: str,
k: int = None,
):
"""
Compute non-normalized batch-mixing LISI (iLISI).
Parameters
----------
data : pandas.DataFrame
DataFrame containing normalized numeric type taxonomic groups and categorical columns.
batch_label : str
Name of the column with the categorical batch-type labels.
k : int, optional
Neighborhood size. By default, uses the square root of the number of samples.
Returns
-------
float
Mean raw iLISI across all samples (range: 1 to number of unique batch-type labels).
"""
# Extract numerico matrix and labels
data_otus = data.select_dtypes(include="number").values
data_batch = data[batch_label].values
n_samples = data_otus.shape[0]
# Determine k
if k is None:
k = max(1, int(np.sqrt(n_samples)))
# Build kNN graph in Euclidean space
knn = NearestNeighbors(n_neighbors=k, metric="euclidean")
knn.fit(data_otus)
_, neighbors = knn.kneighbors(data_otus, return_distance=True)
# Compute per-biological type inverse Simpson's index
isi_list = []
for idx in neighbors:
# Count frequency of each label in the neighborhood
counts = pd.Series(data_batch[idx]).value_counts().values
# Get probability of label in the neighborhood
probs = counts / counts.sum()
# Simpson's diversity: sum of squared probabilities
si = np.sum(probs**2)
# Inverse simpson's index
isi = 1.0 / si
isi_list.append(isi)
# Return the average raw cLISI
iLISI = np.mean(isi_list)
return iLISI
[docs]
def iLISI_norm(
data: pd.DataFrame,
batch_label: str,
k: int = None,
n_batch: int = None,
):
"""
Compute normalized batch-mixing LISI (iLISI) in [0, 1].
Parameters
----------
data : pandas.DataFrame
DataFrame containing normalized numeric type taxonomic groups and categorical columns.
batch_label : str
Name of the column with the categorical batch-type labels.
k : int, optional
Neighborhood size. By default, uses the square root of the number of samples.
n_batch : int, optional
Number of batch labels. If not provided, inferred from data.
Returns
-------
float
Mean normalized iLISI across all samples (range: 0 to 1).
"""
# Compute raw iLISI score
raw = iLISI_raw(data=data, batch_label=batch_label, k=k)
# Number of distinct batch-type labels
if n_batch is None:
n_batch = data[batch_label].nunique()
# In case there is only 1 batch group
if n_batch < 2:
return 1.0
# Normalized rank from 0 to 1
ilisi_norm = (raw - 1) / (n_batch - 1)
return ilisi_norm
[docs]
def iLISI_full_rank(
data: pd.DataFrame,
batch_label: str,
n_batch=None,
perplexities: list = None,
):
"""
Compute normalized iLISI for a range of perplexities.
Parameters
----------
data : pandas.DataFrame
DataFrame containing normalized numeric type taxonomic groups and categorical columns.
batch_label : str
Name of the column with the categorical batch-type labels.
n_batch : int, optional
Number of batch labels. If not provided, inferred from data.
perplexities : list of int, optional
List of neighborhood sizes (k) to use. If None, uses all possible k.
Returns
-------
pandas.DataFrame
DataFrame with columns ['perplexity', 'iLISI'] for each k.
"""
# Define perplexities to be used
if perplexities is None:
perplexities = [i + 1 for i in range(data.shape[0])]
ilisis = []
for k in perplexities:
ilisi = iLISI_norm(data=data, batch_label=batch_label, k=k)
ilisis.append({"perplexity": k, "iLISI": ilisi})
ilisi_table = pd.DataFrame(ilisis)
return ilisi_table
# Pairwise distance
[docs]
def pairwise_distance(data, sample_label, batch_label, bio_label):
"""
Compute mean pairwise Euclidean distances within and between biological groups.
Parameters
----------
data : pandas.DataFrame
Input data containing OTU counts and metadata.
sample_label : str
Column name for sample identifiers.
batch_label : str
Column name for batch identifiers.
bio_label : str
Column name for biological group identifiers.
Returns
-------
tuple of float
(mean_all_dists, mean_within_dists, mean_between_dists)
"""
# Step 1: normalized data to compute euclidean-like distance
norm_data = DataTransform(
data,
factors=[sample_label, batch_label, bio_label],
transformation="CLR",
count=True,
)
norm_counts = norm_data.select_dtypes(include="number").values
sample_ids = norm_data[sample_label].values
bios = norm_data[bio_label].values
# Step 2: compute pairwise distance (euclidean) to every point
pair_dists = squareform(pdist(norm_counts, metric="euclidean"))
pair_dists_df = (
pd.DataFrame(
pair_dists, index=norm_data[sample_label], columns=norm_data[sample_label]
)
.reset_index()
.rename(columns={sample_label: "pointA"})
)
# Get longer dataframe
long_dists_df = pair_dists_df.melt(
id_vars="pointA", var_name="pointB", value_name="distance"
)
# Remove distances to the same point
long_dists_df = long_dists_df.query("pointA != pointB")
# Add biological group information
bio_map = dict(zip(sample_ids, bios, strict=False))
long_dists_df["pointA_bio"] = long_dists_df["pointA"].map(bio_map)
long_dists_df["pointB_bio"] = long_dists_df["pointB"].map(bio_map)
# Step 3: compute pairwise distance within and between every group
within_bios = []
between_bios = []
for _idx, point in long_dists_df.iterrows():
if point["pointA_bio"] == point["pointB_bio"]:
within_bios.append(point["distance"])
else:
between_bios.append(point["distance"])
within_bios = np.array(within_bios)
between_bios = np.array(between_bios)
# Step 5: summarize
mean_all_dists = long_dists_df["distance"].mean()
mean_within_dists = within_bios.mean()
mean_between_dists = between_bios.mean()
return mean_all_dists, mean_within_dists, mean_between_dists
[docs]
def pairwise_distance_std(data, sample_label, batch_label, bio_label):
"""
Compute standard deviation of pairwise Euclidean distances within and between biological groups.
Parameters
----------
data : pandas.DataFrame
Input data containing OTU counts and metadata.
sample_label : str
Column name for sample identifiers.
batch_label : str
Column name for batch identifiers.
bio_label : str
Column name for biological group identifiers.
Returns
-------
tuple of float
(std_all_dists, std_within_dists, std_between_dists)
"""
# Step 1: normalized data to compute euclidean-like distance
norm_data = DataTransform(
data,
factors=[sample_label, batch_label, bio_label],
transformation="CLR",
count=True,
)
norm_counts = norm_data.select_dtypes(include="number").values
sample_ids = norm_data[sample_label].values
bios = norm_data[bio_label].values
# Step 2: compute pairwise distance (euclidean) to every point
pair_dists = squareform(pdist(norm_counts, metric="euclidean"))
pair_dists_df = (
pd.DataFrame(
pair_dists, index=norm_data[sample_label], columns=norm_data[sample_label]
)
.reset_index()
.rename(columns={sample_label: "pointA"})
)
# Get longer dataframe
long_dists_df = pair_dists_df.melt(
id_vars="pointA", var_name="pointB", value_name="distance"
)
# Remove distances to the same point
long_dists_df = long_dists_df.query("pointA != pointB")
# Add biological group information
bio_map = dict(zip(sample_ids, bios, strict=False))
long_dists_df["pointA_bio"] = long_dists_df["pointA"].map(bio_map)
long_dists_df["pointB_bio"] = long_dists_df["pointB"].map(bio_map)
# Step 3: compute pairwise distance within and between every group
within_bios = []
between_bios = []
for _idx, point in long_dists_df.iterrows():
if point["pointA_bio"] == point["pointB_bio"]:
within_bios.append(point["distance"])
else:
between_bios.append(point["distance"])
within_bios = np.array(within_bios)
between_bios = np.array(between_bios)
# Step 5: summarize
mean_all_dists = long_dists_df["distance"].std()
mean_within_dists = within_bios.std()
mean_between_dists = between_bios.std()
return mean_all_dists, mean_within_dists, mean_between_dists
[docs]
def pairwise_distance_multi_run(data, sample_label, batch_label, bio_label):
"""
Compute all pairwise Euclidean distances and return as a long-form DataFrame.
Parameters
----------
data : pandas.DataFrame
Input data containing OTU counts and metadata.
sample_label : str
Column name for sample identifiers.
batch_label : str
Column name for batch identifiers.
bio_label : str
Column name for biological group identifiers.
Returns
-------
pandas.DataFrame
Long-form DataFrame with columns ['pointA', 'pointB', 'distance', 'pointA_bio', 'pointB_bio'].
"""
# Step 1: normalized data to compute euclidean-like distance
norm_data = DataTransform(
data,
factors=[sample_label, batch_label, bio_label],
transformation="CLR",
count=True,
)
norm_counts = norm_data.select_dtypes(include="number").values
sample_ids = norm_data[sample_label].values
bios = norm_data[bio_label].values
# Step 2: compute pairwise distance (euclidean) to every point
pair_dists = squareform(pdist(norm_counts, metric="euclidean"))
pair_dists_df = (
pd.DataFrame(
pair_dists, index=norm_data[sample_label], columns=norm_data[sample_label]
)
.reset_index()
.rename(columns={sample_label: "pointA"})
)
# Get longer dataframe
long_dists_df = pair_dists_df.melt(
id_vars="pointA", var_name="pointB", value_name="distance"
)
# Remove distances to the same point
long_dists_df = long_dists_df.query("pointA != pointB")
# Add biological group information
bio_map = dict(zip(sample_ids, bios, strict=False))
long_dists_df["pointA_bio"] = long_dists_df["pointA"].map(bio_map)
long_dists_df["pointB_bio"] = long_dists_df["pointB"].map(bio_map)
return long_dists_df
[docs]
def PERMANOVA(data, sample_label, batch_label, bio_label):
"""
Perform PERMANOVA (permutational multivariate analysis of variance) using Bray-Curtis and Aitchison distances.
Parameters
----------
data : pandas.DataFrame
Input data containing OTU counts and metadata.
sample_label : str
Column name for sample identifiers.
batch_label : str
Column name for batch identifiers.
bio_label : str
Column name for biological group identifiers.
Returns
-------
tuple
(res_bc, res_ait)
res_bc : pandas.Series
PERMANOVA results for Bray-Curtis distance.
res_ait : pandas.Series
PERMANOVA results for Aitchison distance.
"""
samples = data[sample_label].values
bios = data[bio_label].values
counts = data.select_dtypes(include="number").values + 1e-6
# Compute Bray-Curtis distance matrix
bray = pdist(counts, metric="braycurtis")
dist_mat = squareform(bray)
dm = DistanceMatrix(dist_mat, ids=samples)
# PERMANOVA - Bray-curtis
res_bc = permanova(distance_matrix=dm, grouping=bios)
# Compute manually R^2 = F(k-1) / (F(k - 1) + (N-k)) , k = number of groups, N = number of samples, F = F-statistic (test statistic)
res_bc["R2"] = (
res_bc["test statistic"]
* (len(bios.unique()) - 1)
/ (
res_bc["test statistic"] * (len(bios.unique()) - 1)
+ (len(samples) - len(bios.unique()))
)
)
# Compute Aitchison distance matrix
norm_data = DataTransform(
data,
factors=[sample_label, batch_label, bio_label],
transformation="CLR",
count=True,
)
norm_counts = norm_data.select_dtypes(include="number").values
aitch = pdist(norm_counts, metric="euclidean")
dist_mat = squareform(aitch)
dm = DistanceMatrix(dist_mat, ids=samples)
# PERMANOVA - Aitchison
res_ait = permanova(distance_matrix=dm, grouping=bios)
# Compute manually R^2 = F(k-1) / (F(k - 1) + (N-k)) , k = number of groups, N = number of samples, F = F-statistic (test statistic)
res_ait["R2"] = (
res_ait["test statistic"]
* (len(bios.unique()) - 1)
/ (
res_ait["test statistic"] * (len(bios.unique()) - 1)
+ (len(samples) - len(bios.unique()))
)
)
return res_bc, res_ait