import tensorflow as tf
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import altair as alt
import os
from .normalizers import get_norm_method
from .tf_dataset_pipeline import prepare_tf_dataset
from .load_yaml import yaml_to_dataframes
from ..plotting.plots import boxplot, barplot
[docs]class Dataset:
"""Represents an L1000 Dataset
Parameters
----------
data : ``dataframe``, shape (n_samples, (n_genes + n_metadata_fields))
A sample by gene expression matrix padded to the right with per sample metadata.
Generally it is easiest to construct a Dataset from a class method, ``Dataset.from_yaml()`` or ``Dataset.from_dataframes()``.
gene_meta : ``dataframe``, shape (n_genes, n_features)
Contains the metadata for each of the genes in the data matrix.
n_genes : ``int``
Number of genes in expression matrix. This explicitly defines
the column index which divides the expression values and metadata.
Attributes
----------
data : ``dataframe``, shape (n_samples, n_genes)
Gene expression matrix as a dataframe. Shared indicies with `self.sample_meta` and `self.gene_meta`.
sample_meta : ``dataframe``, shape (n_samples, n_metadata_features)
Per profile metadata. Row index same as ``Dataset.data.index``.
gene_meta : ``dataframe``, shape (n_genes, n_gene_features)
Gene metadata. Row index same as ``Dataset.data.columns``.
"""
[docs] def __init__(self, data, gene_meta, n_genes):
self._data = data
self.gene_meta = gene_meta
self.n_genes = n_genes
@property
def data(self):
"""A dataframe representing the sample x gene expression matrix"""
return self._data.iloc[:, : self.n_genes]
@property
def sample_meta(self):
"""A dataframe representing the per sample metadata"""
return self._data.iloc[:, self.n_genes :]
[docs] @classmethod
def from_dataframes(cls, data_df, sample_meta_df, gene_meta_df):
"""Dataset constructor method from multiple dataframes
Parameters
----------
data_df : dataframe, shape (n_samples, n_genes)
Contains the expression data from experiment. Must have
shared row index with ``sample_meta_df``.
sample_meta_df : ``dataframe``, shape (n_samples, n_meta_features)
Contains the metadata for each of the samples in experiment.
gene_meta_df : dataframe, shape (n_genes, n_gene_features)
Contains the metadata for each of the genes in experiment.
"""
data = data_df.join(sample_meta_df)
return cls(data, gene_meta_df, len(gene_meta_df))
[docs] @classmethod
def from_yaml(cls, path, sample_ids=None, only_landmark=True, **filter_kwargs):
"""Dataset constructor method from yaml specification
Parameters
----------
path : ``str``
Valid string path to ``.yaml`` or ``.yml`` file.
sample_ids : ``list`` (optional, default ``None``)
Unique sample ids to read from data and metadata files.
only_landmark : ``bool`` (optional, default ``True``)
Whether to parse all genes or only the landmark.
filter_kwargs :
Optional keyword args to subset data by specific features
in per sample metadata. Each kwarg must follow the following.
``keyword`` - a column in metadata ``arg`` - a list
of values to filter from keyword field.
Returns
-------
``Dataset``
>>> Dataset.from_yaml("settings.yaml", cell_id=["MCF7", "PC3"], pert_id=["trt_cp"])
"""
data_df, sample_meta_df, gene_meta_df = yaml_to_dataframes(
path, sample_ids, only_landmark, **filter_kwargs
)
data = data_df.join(sample_meta_df)
return cls(data, gene_meta_df, len(gene_meta_df))
[docs] def sample_rows(self, size, replace=False, meta_groups=None):
"""Returns a Dataset of sampled profiles
Parameters
----------
size : ``int``
Number of samples to return per meta grouping. Default is
to sample from all profiles.
replace : ``bool`` (optional, default ``False``)
Sample with or without replacement.
meta_groups : ``str`` or ``list`` (optional, default ``None``)
If provided, equal numbers of profiles are returned for each metadata grouping.
Returns
-------
``Dataset``
>>> dataset.sample_rows(size=5000, meta_groups="cell_id")
// returns 5000 profiles for each cell_id in dataset
>>> dataset.sample_rows(size=5000, meta_groups=["cell_id", "pert_type"])
// returns 5000 profiles for all groupings of cell_id and pert_type
"""
sampled = (
self._data.sample(size, replace=replace)
if meta_groups is None
else self._data.sample(frac=1, replace=replace)
.groupby(meta_groups)
.head(size)
)
return self._copy(sampled)
[docs] def filter_rows(self, **kwargs):
"""Returns a Dataset of filtered profiles
Parameters
----------
kwargs :
Keyword args to subset data by specific features in sample metadata.
Each kwarg must follow the following. ``keyword``: a column in metadata,
``arg``: a list of values to filter from keyword field.
Returns
-------
``Dataset``
>>> dataset.filter_rows(cell_id=["VCAP, PC3"])
>>> dataset.filter_rows(cell_id="VCAP", pert_type=["ctl_vehicle", "trt_cp"])
"""
filtered = self._data.copy()
for colname, values in kwargs.items():
values = [values] if type(values) == str else values
filtered = filtered[filtered[colname].isin(values)]
return self._copy(filtered)
[docs] def select_samples(self, sample_ids):
"""Returns a Dataset with profiles selected by id
Parameters
----------
sample_ids : ``list``, character ``array``
Desired sample ids to filter dataset.
Returns
-------
``Dataset``
"""
mask = self._data.isin(sample_ids)
return self._copy(self._data[mask])
[docs] def split(self, **kwargs):
"""Returns a tuple of Datasets, split by inclusion criteria
Parameters
----------
kwargs :
Keyword args to subset data by specific features in sample metadata.
Each kwarg must follow the following. ``keyword``: a column in metadata,
``arg``: a str or list of values to filter from keyword field.
Returns
-------
``Dataset``, ``Dataset``
>>> pc3, not_pc3 = dataset.split(cell_id="PC3")
>>> vcap_mcf7, not_vcap_mcf7 = dataset.split(cell_id=["VCAP", "MCF7"])
"""
if len(kwargs.keys()) != 1:
raise ValueError(
"One keyword argument is required: Key must be a meta_data field."
)
data = self._data.copy()
for colname, values in kwargs.items():
values = [values] if type(values) == str else values
mask = data[colname].isin(values)
return self._copy(data[mask]), self._copy(data[~mask])
[docs] def dropna(self, subset, inplace=False):
"""Drops profiles for which there is no metadata in subset
Parameters
----------
subset : ``str`` or ``list``
Metadata field or fields.
inplace : ``bool`` (optional, default: ``False``)
If True, do operation inplace and return None.
"""
if type(subset) is str:
subset = [subset]
if not inplace:
filtered = self._data.dropna(subset=subset)
return self._copy(filtered)
else:
self._data.dropna(subset=subset, inplace=True)
[docs] def set_categorical(self, meta_field):
"""Sets sample metadata column as categorical
Parameters
----------
meta_field : ``str``
Sample metadata column name.
"""
self._data[meta_field] = pd.Categorical(self._data[meta_field])
[docs] def normalize_by_gene(self, normalizer="standard_scale"):
"""Normalize expression by gene
Parameters
----------
normalizer : ``str`` or ``func`` (optional, default 'standard_scale')
Method used normalise dataset. Valid str options are 'standard_scale'
and 'z_score'. If a function is provided, it must take
one argument (``array``), and return an array of the same dimensions.
Returns
-------
``None``
"""
normalizer = get_norm_method(normalizer)
self._data.iloc[:, : self.n_genes] = normalizer(self.data)
[docs] def train_val_test_split(self, p1=0.2, p2=0.2):
"""Splits dataset into training, validation, and test datasets
Parameters
----------
p1 : ``float`` (optional: default ``0.2`` )
Test size in first train/test split.
p2 : ``float`` (optional: default ``0.2`` )
Validation size in remaining train/val split.
Returns
-------
``tuple`` of ``Dataset``'s
"""
X_train, X_test = train_test_split(self._data, test_size=p1)
X_train, X_val = train_test_split(X_train, test_size=p2)
train = KerasDataset(X_train, self.gene_meta.copy(), self.n_genes, "train")
val = KerasDataset(X_val, self.gene_meta.copy(), self.n_genes, "validation")
test = KerasDataset(X_test, self.gene_meta.copy(), self.n_genes, "test")
return train, val, test
[docs] def to_tsv(self, out_dir, sep="\t", prefix=None, **kwargs):
"""Write Dataset object to a tsv file
Parameters
----------
out_dir : ``str``
Path to output directory.
sep : ``str`` (optional)
String of length 1. Field delimiter for the output file.
prefix : ``str`` (optional, default ``None``)
Filename prefix.
"""
os.makedirs(out_dir, exist_ok=True) # create dirs if non-existent
prefix = f"{prefix}_" if prefix else ""
fpaths = [
os.path.join(out_dir, f"{prefix}{suf}.tsv")
for suf in ["data", "sample_meta"]
]
self.data.to_csv(fpaths[0], sep="\t", **kwargs)
self.sample_meta.to_csv(fpaths[1], sep="\t", **kwargs)
[docs] def one_hot_encode(self, meta_field):
"""Return a one-hot vector for a metadata field for all profiles
Parameters
----------
meta_field : ``str``
Valid sample metadata column.
Returns
-------
one_hot :
``array``, (n_samples, n_categories)
"""
one_hot = pd.get_dummies(self.sample_meta[meta_field]).values
return one_hot
[docs] def plot_gene_boxplot(
self, identifier, lookup_col=None, meta_field=None, extent=1.5,
):
"""Returns a boxplot of gene expression, faceted on metadata field
Parameters
----------
identifier : ``str``
String identifier for gene. Default should be one of `self.gene_meta.index`.
lookup_col : ``str`` (optional, default ``None``)
Gene metadata column name. Will be used to lookup `identifier` param rather than index.
meta_field : ``str`` (optional, default ``None``)
Sample metadata column name. Will make multiple boxplots for each metadata category.
extent : ``str`` or ``float`` (optional, default ``1.5``)
Can be either ``'min-max'``, with whiskers covering entire domain, or an number X where
entries outside X stds are shown as individual points.
Returns
-------
``altair.Chart`` object
>>> dataset.plot_gene_boxplot("Gene A", lookup_col="gene_name", meta_field="cell_id")
>>> dataset.plot_gene_boxplot("5270") // dsitribution for gene_id == '5270')
"""
if lookup_col:
gene_mask = self.gene_meta[lookup_col] == str(identifier)
else:
gene_mask = self.gene_meta.index.astype(str) == str(identifier)
gene_index = self.gene_meta[gene_mask].index[0].astype(str)
df = self._data[[gene_index, *self.sample_meta.columns]]
df = df.rename(columns={gene_index: identifier})
return boxplot(df=df, x=meta_field, y=df.columns[0], extent=extent)
[docs] def copy(self):
"""Copies Dataset to a new object"""
return Dataset(self._data.copy(), self.gene_meta.copy(), self.n_genes)
def _copy(self, data):
return Dataset(data, self.gene_meta.copy(), self.n_genes)
def __len__(self):
return self._data.shape[0]
def __repr__(self):
nsamples, ngenes = self.data.shape
return f"<L1000 Dataset: (samples: {nsamples:,}, genes: {ngenes:,})>"
class KerasDataset(Dataset):
""" Represents an L1000 Dataset to be injested into Keras pipeline
Parameters
----------
data : ``dataframe``, shape (n_samples, (n_genes + n_metadata_fields))
A sample by gene expression matrix padded to the right with per sample metadata.
gene_meta : ``dataframe``, shape (n_genes, n_features)
Contains the metadata for each of the genes in the data matrix.
n_genes : ``int``
Number of genes in expression matrix. This explicitly defines
the column index which divides the expression values and metadata.
name : ``str``
Identifier for what type of dataset (``'train'``, ``'validation'``, ``'test'``)
Properties
----------
data : ``dataframe``, shape (n_samples, n_genes)
Gene expression matrix as a dataframe. Shared indicies with `self.sample_meta` and `self.gene_meta`.
sample_meta : ``dataframe``, shape (n_samples, n_metadata_features)
Per profile metadata. Row index same as ``self.data.index``.
gene_meta : ``dataframe``, shape (n_genes, n_gene_features)
Gene metadata. Row index same as ``self.data.columns``.
"""
_valid_names = ["train", "validation", "test"]
def __init__(self, data, gene_meta, n_genes, name, **kwargs):
super(KerasDataset, self).__init__(data, gene_meta, n_genes, **kwargs)
self.name = name
if name not in self._valid_names:
raise ValueError(
f"LINCSKerasDataset 'name' must be one of {self._valid_names}, not '{name}'."
)
self.shuffle, self.repeat = (True, True) if name is "train" else (False, False)
def __call__(self, target, batch_size=64, batch_normalize=None):
"""Converts Dataset to tf.data.Dataset to be ingested by Keras
Parameters
----------
target : ``str``
Valid sample metadata column or ``'self'``. If ``'self'``, the outputs are designed to
be the same as the inputs (i.e. an autoencoder).
batch_size : ``int`` (optional, default ``64``)
Size of batches during training and testing.
batch_normalize : ``str`` (optional, default ``None``)
Whether to batch normalize. Can be one of ``'standard_scale'`` or ``'z_score'``.
>>> keras_dataset("cell_id", batch_size=128) ==> TensorFlow prefetch-dataset.
Returns
-------
tf_dataset : ``tensorflow.data.Dataset``
"""
X = tf.data.Dataset.from_tensor_slices(self.data.values)
y = self._get_target_as_tf_dataset(target)
tf_dataset = prepare_tf_dataset(
dataset=tf.data.Dataset.zip((X, y)),
batch_size=batch_size,
shuffle=self.shuffle,
repeat=self.repeat,
batch_normalize=batch_normalize,
shuffle_buffer_size=self.data.shape[0],
)
return tf_dataset
@classmethod
def from_lincs_dataset(cls, lincs_dataset, name):
"""Constructor from base Dataset class
Parameters
----------
name : ``str``
Type of dataset. Must be one of ``'train'``, ``'validation'``, or ``'test'``.
"""
return cls(
lincs_dataset._data, lincs_dataset.gene_meta, lincs_dataset.n_genes, name
)
def _get_target_as_tf_dataset(self, target):
if target == "self":
y = self.data.values
elif type(target) == str:
y = self.one_hot_encode(target)
elif type(target) == list:
y = tuple(self.one_hot_encode(t) for t in target)
y_tf_dataset = tf.data.Dataset.from_tensor_slices(y)
return y_tf_dataset
def __repr__(self):
nsamples, ngenes = self.data.shape
return f"< {self.name} Dataset: (samples: {nsamples:,}, genes: {ngenes:,})>"