Cloud & MLOps ☁️
Ray Data

Ray Data: Scalable Datasets for ML

Datasets leverages Ray’s task, actor, and object APIs to enable large-scale machine learning (ML) ingest, training, and inference, all within a single Python application.

Ray Data

As a summary, Ray Data provides:

  • Is the standard way to load distributed data into Ray, supporting popular storage backends and file formats.
  • Supports common ML preprocessing operations including basic parallel data transformations such as map, batched map, and filter, and global operations such as sort, shuffle, groupby, and stats aggregations.
  • Supports operations requiring stateful setup and GPU acceleration.
  • Works seamlessly with Ray-integrated data processing libraries (Spark, Pandas, NumPy, Dask, Mars) and ML frameworks (TensorFlow, Torch, Horovod).

The Status Quo

Today, people often stitch together their training pipeline using various distributed compute frameworks. While this approach has its advantages in re-using existing systems, there are several drawbacks, such as:

  1. Lack of programmability (opens in a new tab): need to setup and manage 3+ separate distributed systems. It's also hard to orchestrate with workflow systems due to the interleaving.
  2. Performance overhead (opens in a new tab): intermediate data written to external storage since it needs to cross between distributed systems.

Ray Status Quo

Where Ray Comes In

Ray enables simple Python scripts to replace these pipelines, avoiding their tradeoffs and also improving performance. Ray Datasets are a key part of this vision, acting as the distributed Arrow format for exchanging data between distributed steps in Ray.

Overview

Datasets is fundamentally a distributed dataset abstraction, where the underlying data blocks (partitions) are distributed across a Ray cluster, sitting in distributed memory. A Dataset holds references to one or more in-memory data blocks, distributed across a Ray cluster.

Ray Data Overview

This distributed representation allows for the Dataset to be built by distributed parallel tasks, each pulling a block’s worth of data from a source (e.g., S3) and putting the block into the node’s local object store, with the client-side Dataset object holding references to the distributed blocks. Operations on the client-side Dataset object then result in parallel operations on those blocks.

A Dataset’s blocks can contain data of any modality, including text, arbitrary binary bytes (e.g., images), and numerical data; however, the full power of Datasets is unlocked when used with tabular data. In this case, each block consists of a partition of a distributed table, and these row-based partitions are represented as Arrow Tables under the hood, yielding a distributed Arrow dataset.

Ray Data Arrow

This is a visualization of a Dataset that has three Arrow table blocks, with each block holding 1000 rows.

Note, though, that Datasets is not intended as a replacement for generic data processing systems like Spark. Datasets is meant to be the last-mile bridge between ETL pipelines and distributed applications running on Ray. This bridge becomes extra powerful when using Ray-integrated DataFrame libraries for your data processing stage, as this allows you to run a full data-to-ML pipeline on top of Ray, eliminating the need to materialize data to external storage as an intermediate step. Ray serves as the universal compute substrate for your ML pipeline, with Datasets forming the distributed data bridge between pipeline stages.

Ray Data Bridge

When the entire pipeline runs on Ray, distributed data can seamlessly pass from relational data processing to model training without touching a disk or centralized data broker.

Basic Features

Scalable parallel I/O

Datasets aims to be a universal parallel data loader, data writer, and exchange format, providing a narrow data waist for Ray applications and libraries to interface with. This is accomplished by heavily leveraging Arrow’s I/O layer, using Ray’s high-throughput task execution to parallelize Arrow’s high-performance single-threaded I/O. The Datasets I/O layer has scaled to multi-petabyte data ingest jobs in production at Amazon.

Scaleable Prallell IO

Datasets’ scalable I/O is all available behind a dead-simple API, expressible via a single call: ray.data.read_<format>().

Data format compatibility

With Arrow’s I/O layer comes support for many tabular file formats (JSON, CSV, Parquet) and storage backends (local disk, S3, GCS, Azure Blog Storage, HDFS). Beyond tabular data, there's added support for parallel reads and writes of NumPy, text, and binary files. This comprehensive support for reading many formats from many external sources, coupled with the extremely scalable parallelization scheme, makes Datasets the preferred way to ingest large amounts of data into a Ray cluster.

# Read structured data from disk, cloud storage, etc.
ray.data.read_parquet("s3://path/to/parquet")
ray.data.read_json("...")
ray.data.read_csv("...")
ray.data.read_text("...")
 
# Read tensor / image / file data.
ray.data.read_numpy("...")
ray.data.read_binary_files("...")
 
# Create from in-memory objects.
ray.data.from_objects([list, of, python, objects])
ray.data.from_pandas([list, of, pandas, dfs])
ray.data.from_numpy([list, of, numpy, arrays])
ray.data.from_arrow([list, of, arrow, tables])

Data framework compatibility

In addition to storage I/O, Datasets also allows for bidirectional in-memory data exchange with many popular distributed frameworks when they are run on Ray, such as Spark, Dask, Modin, and Mars, as well as Pandas and NumPy for small local in-memory data. For convenient ingestion of data into model trainers, Datasets provides an exchange API for both PyTorch and TensorFlow, yielding the familiar framework-specific datasets, torch.util.data.IterableDataset and tf.data.Dataset.

# Convert from existing DataFrames.
ray.data.from_spark(spark_df)
ray.data.from_dask(dask_df)
ray.data.from_modin(modin_df)
 
# Convert to DataFrames and ML datasets.
dataset.to_spark()
dataset.to_dask()
dataset.to_modin()
dataset.to_torch()
dataset.to_tf()
 
# Convert to objects in the shared memory object store.
dataset.to_numpy_refs()
dataset.to_arrow_refs()
dataset.to_pandas_refs()

Consuming Data

Pass datasets to Ray Tasks or Actors, and access records with methods like take_batch() and iter_batches().

@ray.remote
def consume(ds: ray.data.Dataset) -> int:
    num_batches = 0
    for batch in ds.iter_batches(batch_size=8):
        num_batches += 1
    return num_batches
 
ray.get(consume.remote(transformed_ds))

Or with an Actor:

@ray.remote
class Worker:
 
    def train(self, data_iterator):
        for batch in data_iterator.iter_batches(batch_size=8):
            pass
 
workers = [Worker.remote() for _ in range(4)]
shards = transformed_ds.streaming_split(n=4, equal=True)
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])

Last-mile preprocessing

Datasets offers convenient data preprocessing functionality for common last-mile transformations that you wish to perform right before training your model or doing batch inference. "Last-mile preprocessing" covers transformations that differ across models or that involve per-run or per-epoch randomness. Datasets allow you to do these operations in parallel while keeping everything in (distributed) memory with .map_batches(fn), with no need to persist the results back to storage before starting to train your model or do batch inference.

# Simple transforms.
dataset.map(fn)
dataset.flat_map(fn)
dataset.map_batches(fn)
dataset.filter(fn)
 
# Aggregate operations.
dataset.repartition()
dataset.groupby()
dataset.aggregate()
dataset.sort()
 
# ML Training utilities.
dataset.random_shuffle()
dataset.split()
dataset.iter_batches()

User defined functions

Apply user-defined functions (UDFs) to transform datasets. Ray executes transformations in parallel for performance.

from typing import Dict
import numpy as np
 
# Compute a "petal area" attribute.
def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["petal length (cm)"]
    vec_b = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = vec_a * vec_b
    return batch
 
transformed_ds = ds.map_batches(transform_batch)
print(transformed_ds.materialize())
MaterializedDataset(
   num_blocks=...,
   num_rows=150,
   schema={
      sepal length (cm): double,
      sepal width (cm): double,
      petal length (cm): double,
      petal width (cm): double,
      target: int64,
      petal area (cm^2): double
   }
)

Stateful GPU tasks

To enable batch inference on large datasets, Datasets supports running stateful computations on GPUs. This is quite simple: instead of calling .map_batches(fn) with a stateless function, call .map_batches(callable_cls, compute="actors"). The callable class will be instantiated on a Ray actor, and re-used multiple times to transform input batches for inference:

# Example of GPU batch inference on an ImageNet model.
def preprocess(image: bytes) -> bytes:
    return image
 
class BatchInferModel:
    def __init__(self):
        self.model = ImageNetModel()
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        return self.model(batch)
 
ds = ray.data.read_binary_files("s3://bucket/image-dir")
 
# Preprocess the data.
ds = ds.map(preprocess)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
 
# Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s]
 
# Save the results.
ds.repartition(1).write_json("s3://bucket/inference-results")

Pipelined compute with Datasets

Reading into, transforming, and consuming/writing out of your Dataset creates a series of execution stages. By default, these stages are eagerly executed via blocking calls, which provides an easy-to-understand bulk synchronous parallel execution model and maximal parallelism for each stage:

Ray Data Pipelined

However, this doesn’t allow you to overlap computation across stages: when the first data block is done loading, we can’t start transforming it until all other data blocks are done being loaded as well. If different stages require different resources, this lock-step execution may over-saturate the current stage’s resources while leaving all other stage’s resources idle. Pipelining solves this problem:

Ray Data Pipelined

Pipelining is natively supported in the Datasets API: simply call .window() or .repeat() to generate a DatasetPipeline that can be read, transformed, and written just like a normal Dataset. This means you can easily incrementally process or stream data for ML training and inference. Read more about it in our pipelining docs.

Distributed Dataplane

Ray has a robust distributed dataplane, combining decentralized scheduling with a best-in-class distributed object layer, featuring:

  • Efficient zero-copy reads via shared memory for workers on the same node, obviating the need for serialization when sharing data across worker processes.
  • Locality-aware scheduling, where data-intensive tasks are scheduled onto the node that has the most of the task’s required data already local.
  • Resilient object transfer protocols, with a memory manager that ensures prefetching and forward progress while bounding the amount of memory usage.
  • A fast data transport implementation, transferring data chunks in parallel in order to maximize transfer throughput.

Given Ray's distributed dataplane, building a library like Datasets becomes comparatively simple. Datasets delegates most of the heavy lifting to the Ray dataplane, focusing on providing higher-level features such as convenient APIs, data format support, and stage pipelining.


Resources: