Source code for abaco.plots

import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from skbio.stats.ordination import pcoa
from scipy.spatial.distance import pdist, squareform
import plotly.graph_objects as go
from plotly.validator_cache import ValidatorCache
from plotly.subplots import make_subplots
from clustergrammer2 import Network, CGM2
from abaco.dataloader import DataTransform


[docs] def plotPCoA( data, method="aitchison", sample_label="sample", batch_label="batch", experiment_label="tissue", mode="base", ): """ Plot Principal Coordinates Analysis (PCoA) for batch effect visualization. Parameters ---------- data : pandas.DataFrame Input data containing OTU counts and metadata. method : str, optional Distance metric to use ('aitchison' or 'bray-curtis'), by default 'aitchison'. sample_label : str, optional Column name for sample identifiers, by default 'sample'. batch_label : str, optional Column name for batch identifiers, by default 'batch'. experiment_label : str, optional Column name for experiment/tissue identifiers, by default 'tissue'. mode : str, optional Plotting mode ('base' for batch+experiment, 'single' for batch only), by default 'base'. Returns ------- None Displays a Plotly figure. """ if method == "aitchison": # CLR transform df = DataTransform( data, factors=[sample_label, batch_label, experiment_label], transformation="CLR", count=True, ) # log_transformed = np.log(data.select_dtypes(include = "number") + 1e-9) # clr_data = log_transformed - log_transformed.mean(axis=1)[:, None] # print(clr_data) # Extracting numerical data df_otu = df.select_dtypes(include="number") # Compute Aitchison distances distances = pdist(df_otu, "euclidean") distances = squareform(distances) elif method == "bray-curtis": # Extract numeric data (e.g., OTU count data). df_otu = data.select_dtypes(include="number") # Convert each sample's counts to relative abundances (row sums are normalized to 1) # (Handling potential division by zero) row_sums = df_otu.sum(axis=1) df_rel = df_otu.div(row_sums.replace(0, np.nan), axis=0).fillna(0) # Compute Bray-Curtis distances distances = pdist(df_rel, metric="braycurtis") distances = squareform(distances) else: raise (ValueError(f"Method provided not valid: {method}")) # PCoA pcoa_res = pcoa(distances) # Construct DataFrame with principal components and metadata df_pcoa = pd.DataFrame(pcoa_res.samples[["PC1", "PC2"]], columns=["PC1", "PC2"]) df_pcoa.index = ( data.index ) # This assures index are the same and both DataFrames are perfectly aligned df_pcoa[[sample_label, batch_label, experiment_label]] = data[ [sample_label, batch_label, experiment_label] ] # df_pcoa = pd.concat([data[[sample_label, batch_label, experiment_label]], df_pcoa], axis=1) # Extracting available symbols to be used per experiment SymbolValidator = ValidatorCache.get_validator("scatter.marker", "symbol") raw_symbols = SymbolValidator.values[2::12] # Defining a set of colors to be used for batches raw_colors = [ "blue", "red", "green", "orange", "purple", "cyan", "magenta", "yellow", "black", "brown", "pink", "gray", "olive", "teal", "navy", "maroon", "gold", "lime", "indigo", "violet", "coral", "slateblue", "aquamarine", "crimson", "sienna", "salmon", "turquoise", "lavender", "chocolate", "tomato", "plum", "peru", "khaki", "orchid", "springgreen", "steelblue", "seagreen", "darkblue", "darkred", "darkgreen", "darkorange", "darkviolet", "mediumblue", "mediumvioletred", "mediumseagreen", "midnightblue", "lightblue", "lightgreen", "lightcoral", "peachpuff", ] # Adding symbol to corresponding experiment df_pcoa["marker"] = None for n, exp in enumerate(df_pcoa[experiment_label].unique()): df_pcoa.loc[df_pcoa[experiment_label] == exp, "marker"] = raw_symbols[n] # Adding color to corresponding batch df_pcoa["color"] = None for n, batch in enumerate(df_pcoa[batch_label].unique()): df_pcoa.loc[df_pcoa[batch_label] == batch, "color"] = raw_colors[n] # Creating the plotly figure fig = go.Figure() if mode == "base": # Creating a for loop to alocate PCA data points per batch for batch in df_pcoa[batch_label].unique(): # Creating a for loop to alocate data points per experiment in the current batch for exp in df_pcoa[experiment_label].unique(): # Ploting the points corresponding to the current batch and tissue fig.add_trace( go.Scatter( x=df_pcoa[ (df_pcoa[batch_label] == batch) & (df_pcoa[experiment_label] == exp) ]["PC1"], y=df_pcoa[ (df_pcoa[batch_label] == batch) & (df_pcoa[experiment_label] == exp) ]["PC2"], marker=dict( color=df_pcoa[ (df_pcoa[batch_label] == batch) & (df_pcoa[experiment_label] == exp) ]["color"], size=8, ), marker_symbol=df_pcoa[ (df_pcoa[batch_label] == batch) & (df_pcoa[experiment_label] == exp) ]["marker"], legendgroup=batch, legendgrouptitle_text="Batch {}".format(batch), name=exp, mode="markers", ) ) # fig.update_layout(xaxis_range = [-5, 5], # yaxis_range = [-5, 5]) return fig.show() elif mode == "single": # Creating a for loop to alocate PCA data points per batch for batch in df_pcoa[batch_label].unique(): # Ploting the points corresponding to the current batch and tissue fig.add_trace( go.Scatter( x=df_pcoa[(df_pcoa[batch_label] == batch)]["PC1"], y=df_pcoa[(df_pcoa[batch_label] == batch)]["PC2"], marker=dict( color=df_pcoa[(df_pcoa[batch_label] == batch)]["color"], size=8, ), legendgroup=batch, legendgrouptitle_text="Batch {}".format(batch), name=batch, mode="markers", ) ) # fig.update_layout(xaxis_range = [-5, 5], # yaxis_range = [-5, 5]) return fig.show()
[docs] def plotPCA( data, sample_label="sample", batch_label="batch", experiment_label="tissue" ): """ Plot Principal Component Analysis (PCA) for batch effect visualization. Parameters ---------- data : pandas.DataFrame Input data containing OTU counts and metadata. sample_label : str, optional Column name for sample identifiers, by default 'sample'. batch_label : str, optional Column name for batch identifiers, by default 'batch'. experiment_label : str, optional Column name for experiment/tissue identifiers, by default 'tissue'. Returns ------- None Displays a Plotly figure. """ # Realize the PCA pca = PCA(n_components=2) principal_components = pca.fit_transform(data.select_dtypes(include="number")) df_pca = pd.DataFrame(data=principal_components, columns=["PC1", "PC2"]) df_pca.index = ( data.index ) # This assures index are the same and both DataFrames are perfectly aligned # Add the labels from batch and other important information df_pca[[sample_label, batch_label, experiment_label]] = data[ [sample_label, batch_label, experiment_label] ] # Extracting available symbols to be used per experiment SymbolValidator = ValidatorCache.get_validator("scatter.marker", "symbol") raw_symbols = SymbolValidator.values[2::12] # Defining a set of colors to be used for batches raw_colors = [ "blue", "red", "green", "orange", "purple", "cyan", "magenta", "yellow", "black", "brown", "pink", "gray", "olive", "teal", "navy", "maroon", "gold", "lime", "indigo", "violet", "coral", "slateblue", "aquamarine", "crimson", "sienna", "salmon", "turquoise", "lavender", "chocolate", "tomato", "plum", "peru", "khaki", "orchid", "springgreen", "steelblue", "seagreen", "darkblue", "darkred", "darkgreen", "darkorange", "darkviolet", "mediumblue", "mediumvioletred", "mediumseagreen", "midnightblue", "lightblue", "lightgreen", "lightcoral", "peachpuff", ] # Adding symbol to corresponding experiment df_pca["marker"] = None for n, exp in enumerate(df_pca[experiment_label].unique()): df_pca.loc[df_pca[experiment_label] == exp, "marker"] = raw_symbols[n] # Adding color to corresponding batch df_pca["color"] = None for n, batch in enumerate(df_pca[batch_label].unique()): df_pca.loc[df_pca[batch_label] == batch, "color"] = raw_colors[n] # Creating the plotly figure fig = go.Figure() # Creating a for loop to alocate PCA data points per batch for batch in df_pca[batch_label].unique(): # Creating a for loop to alocate data points per experiment in the current batch for exp in df_pca[experiment_label].unique(): # Ploting the points corresponding to the current batch and tissue fig.add_trace( go.Scatter( x=df_pca[ (df_pca[batch_label] == batch) & (df_pca[experiment_label] == exp) ]["PC1"], y=df_pca[ (df_pca[batch_label] == batch) & (df_pca[experiment_label] == exp) ]["PC2"], marker=dict( color=df_pca[ (df_pca[batch_label] == batch) & (df_pca[experiment_label] == exp) ]["color"], size=8, ), marker_symbol=df_pca[ (df_pca[batch_label] == batch) & (df_pca[experiment_label] == exp) ]["marker"], legendgroup=batch, legendgrouptitle_text="Batch {}".format(batch), name=exp, mode="markers", ) ) # fig.update_layout(xaxis_range = [-5, 5], # yaxis_range = [-5, 5]) fig.update_layout(legend=dict(font=dict(size=8), itemwidth=30)) return fig.show()
[docs] def plotOTUBox(data, batch_label="batch"): """ Plot boxplots of OTU abundances grouped by batch. Parameters ---------- data : pandas.DataFrame Input data containing OTU counts and metadata. batch_label : str, optional Column name for batch identifiers, by default 'batch'. Returns ------- None Displays a Plotly figure with dropdown to select OTUs. """ # Extract OTUs columns names otu_cols = [col for col in data.columns if col.startswith("OTU")] batch_labels = data[batch_label].unique() batch_len = len(batch_labels) # Converting DataFrame from wide to long df_long = pd.melt( data, id_vars=[batch_label], value_vars=otu_cols, var_name="OTU", value_name="value", ) # Defining a set of colors to be used for batches raw_colors = ["blue", "red", "green", "orange", "purple"] # Adding color to corresponding batch batch_colors = [] for i in range(batch_len): batch_colors.append(raw_colors[i]) fig = go.Figure() # Add traces for each OTU for otu in otu_cols: for i, batch in enumerate(batch_labels): fig.add_trace( go.Box( x=df_long[ (df_long["OTU"] == otu) & (df_long[batch_label] == batch) ][batch_label], y=df_long[ (df_long["OTU"] == otu) & (df_long[batch_label] == batch) ]["value"], marker=dict( color=batch_colors[i] ), # Apply color to the batch boxplot name=f"Batch {batch}, {otu}", # Label each trace by the OTU visible=False, # Set initially to invisible ) ) # First OTU visible by default for i in range(batch_len): fig.data[i].visible = True # Add dropdown to select which OTU to display fig.update_layout( xaxis_title="Batch", updatemenus=[ dict( buttons=[ *[ dict( args=[ { "visible": [ (i >= batch_len * idx) & (i <= batch_len * idx + (batch_len - 1)) for i in range(len(fig.data)) ] } ], # Toggle visibility label=otu, method="update", ) for idx, otu in enumerate(otu_cols) ] ], direction="down", showactive=True, xanchor="left", y=1.15, yanchor="top", ) ], ) return fig.show()
[docs] def plotRLE( data, sample_label="sample", batch_label="batch", experiment_label="tissue" ): """ Plot Relative Log Expression (RLE) boxplots for each experiment and batch. Parameters ---------- data : pandas.DataFrame Input data containing OTU counts and metadata. sample_label : str, optional Column name for sample identifiers, by default 'sample'. batch_label : str, optional Column name for batch identifiers, by default 'batch'. experiment_label : str, optional Column name for experiment/tissue identifiers, by default 'tissue'. Returns ------- None Displays a Plotly figure with dropdown to select experiments. """ # Extract OTUs column names otu_cols = [col for col in data.columns if col.startswith("OTU")] # Converting DataFrame from wide to long df_long = pd.melt( data, id_vars=[sample_label, batch_label, experiment_label], value_vars=otu_cols, var_name="OTU", value_name="value", ) # Calculating the medians of each OTU within each experiment df_long["medians"] = None for OTU in df_long["OTU"].unique(): for exp in df_long[experiment_label].unique(): med = np.median( df_long[(df_long["OTU"] == OTU) & (df_long[experiment_label] == exp)][ "value" ] ) df_long.loc[ (df_long["OTU"] == OTU) & (df_long[experiment_label] == exp), "medians" ] = med # Incorporating the difference between OTU value in each sample and the median across all samples from the same tissue df_long["RLE"] = df_long["value"] - df_long["medians"] # Defining a set of colors to be used for batches raw_colors = ["blue", "red", "green", "orange", "purple"] # Adding color to corresponding batch df_long["color"] = None for n, batch in enumerate(df_long[batch_label].unique()): df_long.loc[df_long[batch_label] == batch, "color"] = raw_colors[n] # Generate RLE plots for each experiment fig = go.Figure() # Add traces for each experiment for exp in df_long[experiment_label].unique(): # Add traces for each batch for batch in df_long[batch_label].unique(): fig.add_trace( go.Box( x=df_long[ (df_long[experiment_label] == exp) & (df_long[batch_label] == batch) ][sample_label], y=df_long[ (df_long[experiment_label] == exp) & (df_long[batch_label] == batch) ]["RLE"], marker_color=df_long[ (df_long[experiment_label] == exp) & (df_long[batch_label] == batch) ]["color"].iloc[0], name="Batch {}".format(batch), # Label each trace by the batch visible=False, # Set initially to invisible ) ) # First experiment's traces visible by default for i in range(len(df_long[batch_label].unique())): fig.data[i].visible = True # Add dropdown to select which experiment to display fig.update_layout( updatemenus=[ dict( buttons=[ *[ dict( args=[ { "visible": [ i // len(df_long[batch_label].unique()) == idx for i in range(len(fig.data)) ] } ], # Toggle visibility label=exp, method="update", ) for idx, exp in enumerate(df_long[experiment_label].unique()) ] ], direction="down", showactive=True, xanchor="left", y=1.15, yanchor="top", ) ] ) # Add horizontal dashed red line at y = 0 as a reference point fig.add_shape( type="line", x0=0, x1=1, # Extend the line across the x-axis y0=0, y1=0, # Line positioned at y = 0 xref="paper", yref="y", # "paper" allows the line to span the entire plot width line=dict(color="red", width=2, dash="dash"), # Dashed red line ) return fig.show()
[docs] def plotClusterHeatMap( data, batch_label="batch", experiment_label="tissue", sample_label="sample" ): """ Plot a clustered heatmap of scaled OTU data with batch and experiment metadata. Parameters ---------- data : pandas.DataFrame Input data containing OTU counts and metadata. batch_label : str, optional Column name for batch identifiers, by default 'batch'. experiment_label : str, optional Column name for experiment/tissue identifiers, by default 'tissue'. sample_label : str, optional Column name for sample identifiers, by default 'sample'. Returns ------- clustergrammer2.CGM2Widget Clustergrammer2 widget displaying the clustered heatmap. """ # Extracts numerical and categorical data of interest data_num = data.select_dtypes(include="number") data_num.index = [str(i) for i in data[sample_label]] data_cat = data[[batch_label, experiment_label]] data_cat.index = [str(i) for i in data[sample_label]] # First scaling process - Ensures every observation is scaled according to OTUs scaler = StandardScaler() scaled_data = scaler.fit_transform(data_num) scaled_data = pd.DataFrame( scaled_data, columns=data_num.columns, index=data_num.index ) # Second scaling process - Ensures every observation is scaled according to sample scaler = StandardScaler() scaled_data = scaler.fit_transform(scaled_data.T) scaled_data = pd.DataFrame( scaled_data, index=[str(i) for i in data_num.columns], columns=[str(i) for i in data_num.index], ) # Create Clustergrammer2 plot n2 = Network(CGM2) n2.load_df(scaled_data, meta_col=data_cat) n2.cluster() return n2.widget()
[docs] def plot_LISI_perplexity( df_c, df_i, n_samples: int, x_col: str = "perplexity", y_col_c: str = "cLISI", y_col_i: str = "iLISI", title_c: str = "Biological conservation (cLISI)", title_i: str = "Batch mixing (iLISI)", ): """ Plot cLISI and iLISI scores as a function of perplexity. Parameters ---------- df_c : pandas.DataFrame DataFrame containing cLISI scores and perplexity values. df_i : pandas.DataFrame DataFrame containing iLISI scores and perplexity values. n_samples : int Number of samples in the dataset. x_col : str, optional Column name for perplexity values, by default 'perplexity'. y_col_c : str, optional Column name for cLISI scores, by default 'cLISI'. y_col_i : str, optional Column name for iLISI scores, by default 'iLISI'. title_c : str, optional Title for the cLISI subplot, by default 'Biological conservation (cLISI)'. title_i : str, optional Title for the iLISI subplot, by default 'Batch mixing (iLISI)'. Returns ------- plotly.graph_objs._figure.Figure Plotly figure with cLISI and iLISI subplots. """ ideal_k = max(int(np.sqrt(n_samples)), 1) # x-axis limits xmin = min(df_c[x_col].min(), df_i[x_col].min()) - 0.5 xmax = max(df_c[x_col].max(), df_i[x_col].max()) + 0.5 # y-intersection y_c = np.interp(ideal_k, df_c[x_col].values, df_c[y_col_c].values) y_i = np.interp(ideal_k, df_i[x_col].values, df_i[y_col_i].values) # Create 1×2 subplots fig = make_subplots( rows=1, cols=2, shared_yaxes=False, shared_xaxes=True, subplot_titles=(title_c, title_i), ) # cLISI trace fig.add_trace( go.Scatter( x=df_c[x_col], y=df_c[y_col_c], mode="lines+markers", name="cLISI", line=dict(color="green"), marker=dict(color="green"), ), row=1, col=1, ) # ideal point in cLISI fig.add_shape( dict( type="line", x0=ideal_k, y0=0, x1=ideal_k, y1=y_c, line=dict(color="red", dash="dash"), ), row=1, col=1, ) fig.add_shape( dict( type="line", x0=xmin, y0=y_c, x1=ideal_k, y1=y_c, line=dict(color="red", dash="dash"), ), row=1, col=1, ) fig.add_annotation( dict( x=ideal_k, y=y_c, xref="x1", yref="y1", text=f"{y_c:.2f}", showarrow=True, arrowhead=1, ax=30, ay=-30, font=dict(color="red"), ) ) # iLISI trace fig.add_trace( go.Scatter( x=df_i[x_col], y=df_i[y_col_i], mode="lines+markers", name="iLISI", line=dict(color="blue"), marker=dict(color="blue"), ), row=1, col=2, ) # Ideal line in iLISI fig.add_shape( dict( type="line", x0=ideal_k, y0=0, x1=ideal_k, y1=y_i, line=dict(color="red", dash="dash"), ), row=1, col=2, ) fig.add_shape( dict( type="line", x0=xmin, y0=y_i, x1=ideal_k, y1=y_i, line=dict(color="red", dash="dash"), ), row=1, col=2, ) fig.add_annotation( dict( x=ideal_k, y=y_i, xref="x2", yref="y2", text=f"{y_i:.2f}", showarrow=True, arrowhead=1, ax=30, ay=-30, font=dict(color="red"), ) ) # Update axes fig.update_xaxes(title_text="Perplexity (k)", range=[xmin, xmax], row=1, col=1) fig.update_xaxes(title_text="Perplexity (k)", range=[xmin, xmax], row=1, col=2) fig.update_yaxes(title_text="Normalized cLISI", row=1, col=1) fig.update_yaxes(title_text="Normalized iLISI", row=1, col=2) fig.update_layout(template="plotly_white", showlegend=False) return fig