TileDB-SOMA-ML

TileDB-SOMA-ML is a machine learning library for working with SOMA data formats.

See /shuffle for a visualization of the batching/shuffling algorithm:

Animation showing the stages of batching and shuffling TileDB-SOMA-ML performs

Module Contents

dataset

class tiledbsoma_ml.ExperimentDataset(query: ExperimentAxisQuery | None = None, layer_name: str | None = None, x_locator: XLocator | None = None, query_ids: QueryIDs | None = None, obs_column_names: Sequence[str] = ('soma_joinid',), batch_size: int = 1, io_batch_size: int = 65536, shuffle: bool = True, shuffle_chunk_size: int = 64, seed: int | None = None, return_sparse_X: bool = False, use_eager_fetch: bool = True)[source]

An IterableDataset implementation that reads from an ExperimentAxisQuery.

Provides an Iterator over MiniBatchs of obs and X data. Each MiniBatch is a tuple containing an ndarray and a pd.DataFrame.

An ExperimentDataset can be passed to experiment_dataloader to enable multi-process reading/fetching.

For example:

>>> from tiledbsoma import Experiment, AxisQuery
>>> from tiledbsoma_ml import ExperimentDataset, experiment_dataloader
>>> with Experiment.open("my_experiment_path") as exp:
...     with exp.axis_query(
...         measurement_name="RNA",
...         obs_query=AxisQuery(value_filter="tissue_type=='lung'")
...     ) as query:
...         ds = ExperimentDataset(query)
...         dl = experiment_dataloader(ds)
>>> X_batch, obs_batch = next(iter(dl))
>>> X_batch
array([0., 0., 0., ..., 0., 0., 0.], dtype=float32)
>>> obs_batch
soma_joinid
0     57905025

When __iter__ is invoked, obs_joinids goes through several partitioning, shuffling, and batching steps, ultimately yielding "mini batches" (tuples of matched X and obs rows):

  1. Partitioning (NDArrayJoinId):

    1. GPU-partitioning: if this is one of \(N>1\) GPU processes (see get_distributed_rank_and_world_size), obs_joinids is partitioned so that the $N$ GPUs will each receive the same number of samples (meaning up to $N-1$ samples may be dropped). Then, only the partition corresponding to the current GPU is kept, The resulting obs_joinids is used in subsequent steps.

    2. DataLoader-worker partitioning: if this is one of $M>1$ DataLoader-worker processes (see get_worker_id_and_num), obs_joinids is further split $M$ ways, and only obs_joinids corresponding to the current process are kept.

  2. Shuffle-chunking (List[NDArrayJoinId]): if shuffle=True, obs_joinids are broken into “shuffle chunks”. The chunks are then shuffled amongst themselves (but retain their chunk-internal order, at this stage). If shuffle=False, one “chunk” is emitted containing all obs_joinids.

  3. IO-batching (Iterable[IOBatch]): shuffle-chunks are re-grouped into “IO batches” of size io_batch_size. If shuffle=True, each IOBatch is shuffled, then the corresponding X and obs rows are fetched from the underlying Experiment.

  4. Mini-batching (Iterable[MiniBatch]): IOBatch tuples are re-grouped into “mini batches” of size batch_size.

Shuffling support (in steps 2. and 3.) is enabled with the shuffle parameter, and should be used in lieu of DataLoader’s default shuffling functionality. Similarly, batch_size should be used instead of DataLoader’s default batching. experiment_dataloader is the recommended way to wrap an ExperimentDataset in a DataLoader, as it enforces these constraints while passing through other DataLoader args.

Describing the whole process another way, we read randomly selected groups of obs coordinates from across all ExperimentAxisQuery results, concatenate those into an I/O buffer, shuffle the buffer element-wise, fetch the full row data (X and obs) for each coordinate, and send that on to PyTorch / the GPU, in mini-batches. The randomness of the shuffle is determined by:

  • shuffle_chunk_size: controls the granularity of the global shuffle. shuffle_chunk_size=1 corresponds to a full global shuffle, but decreases I/O performance. Larger values cause chunks of rows to be shuffled, increasing I/O performance (by taking advantage of data locality in the underlying Experiment) but decreasing overall randomness of the yielded data.

  • io_batch_size: number of rows to fetch at once (comprised of concatenated shuffle-chunks, and shuffled row-wise). Larger values increase shuffle-randomness (by shuffling more “shuffle chunks” together), I/O performance, and memory usage.

Lifecycle:

experimental

__init__(query: ExperimentAxisQuery | None = None, layer_name: str | None = None, x_locator: XLocator | None = None, query_ids: QueryIDs | None = None, obs_column_names: Sequence[str] = ('soma_joinid',), batch_size: int = 1, io_batch_size: int = 65536, shuffle: bool = True, shuffle_chunk_size: int = 64, seed: int | None = None, return_sparse_X: bool = False, use_eager_fetch: bool = True)[source]

Construct a new ExperimentDataset.

Parameters:
  • queryExperimentAxisQuery defining data to iterate over. This constructor requires {query,layer_name} xor {x_locator,query_ids}.

  • layer_nameX layer to read. This constructor requires {query,layer_name} xor {x_locator,query_ids}.

  • x_locatorXLocator pointing to an X array to read. This constructor requires {query,layer_name} xor {x_locator,query_ids}.

  • query_idsQueryIDs containing obs and var joinids to read. This constructor requires {query,layer_name} xor {x_locator,query_ids}.

  • obs_column_names – The names of the obs columns to return. At least one column name must be specified. Default is ('soma_joinid',).

  • batch_size – The number of rows of X and obs data to yield in each MiniBatch. When batch_size is 1 (the default) and return_sparse_X is False (also default), the yielded ndarrays will have rank 1 (representing a single row); larger values of batch_size (or return_sparse_X is True) will result in arrays of rank 2 (multiple rows). Note that a batch_size of 1 allows this IterableDataset to be used with DataLoader batching, but higher performance can be achieved by performing batching in this class, and setting the DataLoaders batch_size parameter to None.

  • io_batch_size

    The number of obs/X rows to retrieve when reading data from SOMA. This impacts:

    1. Maximum memory utilization, larger values provide better read performance, but require more memory.

    2. The number of rows read prior to shuffling (see the shuffle parameter for details).

    The default value of 65,536 provides high performance but may need to be reduced in memory-limited hosts or when using a large number of DataLoader workers.

  • shuffle – Whether to shuffle the obs and X data being returned. Defaults to True.

  • shuffle_chunk_size – Global-shuffle granularity; larger numbers correspond to less randomness, but greater read performance. “Shuffle chunks” are contiguous rows in the underlying Experiment, and are shuffled among themselves before being combined into IO batches (which are internally shuffled, before fetching and finally mini-batching). If shuffle == False, this parameter is ignored.

  • seed – The random seed used for shuffling. Defaults to None (no seed). This argument MUST be specified when using DistributedDataParallel to ensure data partitions are disjoint across worker processes.

  • return_sparse_X – If True, will return the X data as a csr_matrix. If False (the default), will return X data as a ndarray.

  • use_eager_fetch – Fetch the next SOMA chunk of obs and X data immediately after a previously fetched SOMA chunk is made available for processing via the iterator. This allows network (or filesystem) requests to be made in parallel with client-side processing of the SOMA data, potentially improving overall performance at the cost of doubling memory utilization. Defaults to True.

Raises:

ValueError – on unsupported or malformed parameter values.

Lifecycle:

experimental

Warning

When using this class in any distributed mode, calling the set_epoch() method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will always be used.

In addition, when using shuffling in a distributed configuration (e.g., DDP), you must provide a seed, ensuring that the same shuffle is used across all replicas.

__iter__() Iterator[Tuple[ndarray[Any, dtype[number[Any]]] | csr_matrix, DataFrame]][source]

Emit MiniBatchs (aligned X and obs rows).

Returns:

Iterator[MiniBatch]

Lifecycle:

experimental

__len__() int[source]

Return the number of batches this iterable will produce. If run in the context of torch.distributed or as a multi-process loader (i.e., DataLoader instantiated with num_workers > 0), the batch count will reflect the size of the data partition assigned to the active process.

See important caveats in the PyTorch DataLoader documentation regarding len(dataloader), which also apply to this class.

Returns:

int (number of batches).

Lifecycle:

experimental

batch_size: int

Number of rows of X and obs data to yield in each MiniBatch.

epoch: int
io_batch_size: int

Number of obs/X rows to fetch together, when reading from the provided ExperimentAxisQuery.

property layer_name: str | None
property measurement_name: str
obs_column_names: List[str]

Names of obs columns to return.

query_ids: QueryIDs

obs/var coordinates (from an ExperimentAxisQuery) to iterate over.

random_split(*fracs: float, seed: int | None = None, method: Literal['deterministic', 'multinomial', 'stochastic_rounding'] = 'stochastic_rounding') Tuple[ExperimentDataset, ...][source]

Split this ExperimentDataset into 1 or more ExperimentDataset‘s, randomly sampled according fracs.

  • fracs must sum to $1$

  • seed is optional

  • method: see SamplingMethod for details

rank: int
return_sparse_X: bool

When True, return X data as a csr_matrix (by default, return ndarrays).

seed: int | None

Random seed used for shuffling.

set_epoch(epoch: int) None[source]

Set the epoch for this Data iterator.

When shuffle is True, this will ensure that all replicas use a different random ordering for each epoch. Failure to call this method before each epoch will result in the same data ordering across all epochs.

This call must be made before the per-epoch iterator is created.

property shape: Tuple[int, int]

Return the number of batches and features that will be yielded from this Experiment.

If used in multiprocessing mode (i.e. DataLoader instantiated with num_workers > 0), the number of batches will reflect the size of the data partition assigned to the active process.

Returns:

number of batches, number of vars.

Return type:

A tuple of two int values

Lifecycle:

experimental

shuffle: bool

Whether to shuffle the obs and X data being returned.

shuffle_chunk_size: int

Number of contiguous rows shuffled as an atomic unit (before later concatenation and shuffling within IOBatchs).

use_eager_fetch: bool

Pre-fetch one “IO batch” and one “mini batch”.

world_size: int
x_locator: XLocator

State required to open an X SparseNDArray (and associated obs DataFrame), within an Experiment.

dataloader

tiledbsoma_ml.experiment_dataloader(ds: ExperimentDataset, **dataloader_kwargs: Any) DataLoader[source]

DataLoader factory method for safely wrapping an ExperimentDataset.

Several DataLoader constructor parameters are not applicable, or are non-performant when using loaders from this module, including shuffle, batch_size, sampler, and batch_sampler. Specifying any of these parameters will result in an error.

Refer to the DataLoader docs for more information on DataLoader parameters, and ExperimentDataset for info on corresponding parameters.

Parameters:
  • ds – A IterableDataset. May include chained data pipes.

  • **dataloader_kwargs – Additional keyword arguments to pass to the DataLoader constructor, except for shuffle, batch_size, sampler, and batch_sampler, which are not supported when using data loaders in this module.

Returns:

DataLoader

Raises:

ValueError – if any of the shuffle, batch_size, sampler, or batch_sampler params are passed as keyword arguments.

Lifecycle:

experimental

Batching and Data Management

_common

Type aliases used in ExperimentDataset and experiment_dataloader.

tiledbsoma_ml._common.MiniBatch

Yielded by ExperimentDataset; pairs a slice of X rows with corresponding obs rows.

When return_sparse_X is False (the default), a MiniBatch is a tuple of ndarray and pd.DataFrame (for X and obs, respectively). If batch_size=1, the ndarray will have rank 1 (representing a single row), otherwise it will have rank 2. If return_sparse_X is True, the X slice is returned as a csr_matrix (which is always rank 2).

alias of Tuple[ndarray[Any, dtype[number[Any]]] | csr_matrix, DataFrame]

tiledbsoma_ml._common.NDArrayJoinId

ndarray of ``int64``s representing SOMA joinids.

alias of ndarray[Any, dtype[int64]]

_query_ids

Shuffle-chunk and partition (across GPU and DataLoader-worker processes) while reading from a SOMA Experiment.

QueryIDs

class tiledbsoma_ml._query_ids.QueryIDs(*, obs_joinids: ndarray[Any, dtype[int64]], var_joinids: ndarray[Any, dtype[int64]], partition: Partition | None = None)[source]

Wrapper for obs and var IDs from an ExperimentAxisQuery.

Serializable across multiple processes.

obs_joinids: ndarray[Any, dtype[int64]]

obs row coordinates to read.

var_joinids: ndarray[Any, dtype[int64]]

var column coordinates to read.

partition: Partition | None

GPU/Worker-partition info; typically populated by partitioned()

classmethod create(query: ExperimentAxisQuery) QueryIDs[source]

Initialize a QueryIDs object from an ExperimentAxisQuery.

random_split(*fracs: float, seed: int | None = None, method: Literal['deterministic', 'multinomial', 'stochastic_rounding'] = 'stochastic_rounding') Tuple[QueryIDs, ...][source]

Split this QueryIDs into 1 or more QueryIDs, randomly sampled according fracs.

  • fracs must sum to $1$

  • seed is optional

  • method: see SamplingMethod for details

partitioned(partition: Partition) QueryIDs[source]

Create a new QueryIDs with obs_joinids corresponding to a given GPU/worker Partition.

If None is provided, world size, rank, num workers, and worker ID will be inferred using helper functions that read env vars (see get_distributed_rank_and_world_size, get_worker_id_and_num).

When WORLD_SIZE > 1, each GPU will receive the same number of samples (meaning up to WORLD_SIZE-1 samples may be dropped).

shuffle_chunks(shuffle_chunk_size: int, seed: int | None = None) List[ndarray[Any, dtype[int64]]][source]

Divide obs_joinids into chunks of size shuffle_chunk_size, and shuffle them.

Used as a compromise between a full random shuffle (optimal for training performance/convergence) and a sequential, un-shuffled traversal (optimal for I/O efficiency).

Partition

class tiledbsoma_ml._query_ids.Partition(rank: int, world_size: int, worker_id: int, n_workers: int)[source]
rank: int

GPU-process rank.

world_size: int

Number of GPU processes.

worker_id: int

DataLoader-worker index.

n_workers: int

Number of DataLoader-workers (within this GPU process)

Chunks

tiledbsoma_ml._query_ids.Chunks

Return-type of QueryIDs.shuffle_chunks, List of ndarrays.

SamplingMethod

tiledbsoma_ml._query_ids.SamplingMethod

Enum arg to QueryIDs.random_split:

  • "deterministic": number of each class returned will always be \(frac \times N\), rounded to nearest int, e.g. n=12, fracs=[.7,.3] will always produce 8 and 4 elements, resp.

  • "multinomial": each element is assigned to a class independently; no guarantees are made about resulting class sizes.

  • "stochastic_rounding": guarantee each class gets assigned at least \(\lfloor frac \times N \rfloor\) elements. The remainder are then distributed so that class-size expected-values match the provided fracs.

_io_batch_iterable

tiledbsoma_ml._io_batch_iterable.IOBatch

Tuple type emitted by IOBatchIterable, containing X rows (as a CSR_IO_Buffer) and obs rows (pd.DataFrame).

alias of Tuple[CSR_IO_Buffer, DataFrame]

class tiledbsoma_ml._io_batch_iterable.IOBatchIterable(chunks: List[ndarray[Any, dtype[int64]]], io_batch_size: int, obs: DataFrame, var_joinids: ndarray[Any, dtype[int64]], X: SparseNDArray, obs_column_names: Sequence[str] = ('soma_joinid',), seed: int | None = None, shuffle: bool = True, use_eager_fetch: bool = True)[source]

Given a list of obs_joinid Chunks, re-chunk them into (optionally shuffled) IOBatch’s”.

An IOBatch is a tuple consisting of a batch of rows from the X SparseNDArray, as well as the corresponding rows from the obs DataFrame. The X rows are returned in an optimized CSR_IO_Buffer.

X: SparseNDArray
chunks: List[ndarray[Any, dtype[int64]]]
property io_batch_ids: Iterable[Tuple[int, ...]]

Re-chunk obs_joinids according to the desired io_batch_size.

io_batch_size: int
obs: DataFrame
obs_column_names: Sequence[str]
seed: int | None
shuffle: bool
use_eager_fetch: bool
var_joinids: ndarray[Any, dtype[int64]]

_mini_batch_iterable

class tiledbsoma_ml._mini_batch_iterable.MiniBatchIterable(io_batch_iter: IOBatchIterable, batch_size: int, use_eager_fetch: bool = True, return_sparse_X: bool = False)[source]

Convert (possibly shuffled) IOBatchIterable into MiniBatch’s suitable for passing to PyTorch.

batch_size: int
io_batch_iter: IOBatchIterable
maybe_squeeze(mini_batch: Tuple[ndarray[Any, dtype[number[Any]]] | csr_matrix, DataFrame]) Tuple[ndarray[Any, dtype[number[Any]]] | csr_matrix, DataFrame][source]
return_sparse_X: bool
use_eager_fetch: bool

x_locator

class tiledbsoma_ml.x_locator.XLocator(*, uri: str, measurement_name: str, layer_name: str, tiledb_timestamp_ms: int, tiledb_config: Dict[str, str | float])[source]

State required to open an X SparseNDArray (and associated obs DataFrame), within an Experiment.

Serializable across multiple processes.

classmethod create(experiment: Experiment, measurement_name: str, layer_name: str) XLocator[source]

Initialize an XLocator object from an Experiment, measurement_name, and layer_name.

The arguments provide sufficient info to identify a specific X “layer” in the provided Experiment.

_distributed

Utilities for multiprocess training: determine GPU “rank” / “world_size” and DataLoader worker ID / count.

tiledbsoma_ml._distributed.get_distributed_rank_and_world_size() Tuple[int, int][source]

Return tuple containing equivalent of torch.distributed rank and world size.

tiledbsoma_ml._distributed.get_worker_id_and_num() Tuple[int, int][source]

Return DataLoader ID, and the total number of DataLoader workers.

tiledbsoma_ml._distributed.init_multiprocessing() None[source]

Ensures use of “spawn” for starting child processes with multiprocessing.

Note

Private.

_csr

CSR sparse matrix implementation, optimized for incrementally building from COO matrices.

Private module.

class tiledbsoma_ml._csr.CSR_IO_Buffer(indptr: ndarray[Any, dtype[unsignedinteger[Any]]], indices: ndarray[Any, dtype[unsignedinteger[Any]]], data: ndarray[Any, dtype[number[Any]]], shape: Tuple[int, int])[source]

Implement a minimal CSR matrix with specific optimizations for use in this package.

Operations supported are:
  • Incrementally build a CSR from COO, allowing overlapped I/O and CSR conversion for I/O batches, and a final “merge” step which combines the result.

  • Zero intermediate copy conversion of an arbitrary row slice to dense (i.e., mini-batch extraction).

  • Parallel processing, where possible (construction, merge, etc.).

  • Minimize memory use for index arrays.

Overall is significantly faster, and uses less memory, than the equivalent scipy.sparse operations.

__init__(indptr: ndarray[Any, dtype[unsignedinteger[Any]]], indices: ndarray[Any, dtype[unsignedinteger[Any]]], data: ndarray[Any, dtype[number[Any]]], shape: Tuple[int, int]) None[source]

Construct from PJV format.

static from_ijd(i: ndarray[Any, dtype[unsignedinteger[Any]]], j: ndarray[Any, dtype[unsignedinteger[Any]]], d: ndarray[Any, dtype[number[Any]]], shape: Tuple[int, int]) CSR_IO_Buffer[source]

Build a CSR_IO_Buffer from a COO sparse matrix representation.

static from_pjd(p: ndarray[Any, dtype[unsignedinteger[Any]]], j: ndarray[Any, dtype[unsignedinteger[Any]]], d: ndarray[Any, dtype[number[Any]]], shape: Tuple[int, int]) CSR_IO_Buffer[source]

Build a CSR_IO_Buffer from a SCR sparse matrix representation.

property nnz: int

Number of nonzero elements.

property nbytes: int

Total bytes used by indptr, indices, and data arrays.

property dtype: dtype[Any] | None | type[Any] | _SupportsDType[dtype[Any]] | str | tuple[Any, int] | tuple[Any, SupportsIndex | Sequence[SupportsIndex]] | list[Any] | _DTypeDict | tuple[Any, Any]

Underlying Numpy dtype.

slice_tonumpy(row_index: slice) ndarray[Any, dtype[number[Any]]][source]

Extract slice as a dense ndarray.

Does not assume any particular ordering of minor axis.

slice_toscipy(row_index: slice) csr_matrix[source]

Extract slice as a sparse.csr_matrix.

Does not assume any particular ordering of minor axis, but will return a canonically ordered scipy sparse object.

static merge(mtxs: Sequence[CSR_IO_Buffer]) CSR_IO_Buffer[source]

Merge CSR_IO_Buffers.

sort_indices() Self[source]

Sort indices (in place).

tiledbsoma_ml._csr.smallest_uint_dtype(max_val: int) Type[unsignedinteger[Any]][source]

Return the smallest unsigned-int dtype that can contain max_val.