Cloud & MLOps ☁️
Ray AI Runtime (AIR)

Ray AI Runtime (AIR)

As described earlier, Ray AI Runtime (AIR) is a scalable and unified toolkit for ML applications. Built on top of Ray Core, Ray AIR inherits all the performance and scalability benefits offered by Core while providing a convenient abstraction layer for machine learning. AIR enables simple scaling of individual workloads, end-to-end workflows, and popular ecosystem frameworks, all in just Python.

Ray AIR wraps five native Ray libraries that scale a specific stage of the ML workflow. In addition, Ray AIR brings together an ever-growing ecosystem of integrations with popular machine learning frameworks to create a common interface for development.

Ray AI Runtime (AIR)

For data scientists, AIR can be used to scale individual workloads, and also end-to-end ML applications. For ML Engineers, AIR provides scalable platform abstractions that can be used to easily onboard and integrate tooling from the broader ML ecosystem.

Here, we work through the entire ML workflow from data loading to training and hyperparameter tuning to prediction and serving. Along the way, each section introduces key components of Ray AIR. Here, we briefly go over building a machine learning application that predicts big tips using New York City taxi data.

ML workflow stageRay AIR ComponentRay AIR key conceptNYC Taxi Use Case
Data loading and preprocessingRay DataPreprocessor to load and transform dataUse Preprocessor to load and transform input data.
Model trainingRay TrainTrainer for supported ML frameworks (Keras, Pytorch and more)Use Trainer to scale XGBoost model training.
Hyperparameter tuningRay TuneTuner for hyperparameter searchUse Tuner for hyperparameter search.
Batch predictionRay AIR PredictorBatchPredictor to load model from best checkpoint for batch inferenceUse BatchPredictor to load model from best checkpoint for batch inference; part of Ray Train.
Model servingRay ServePredictorDeployment for online inferenceUse PredictorDeployment for online inference.

For this classification task, we apply a simple XGBoost (opens in a new tab) (a gradient boosted trees framework) model to the June 2021 New York City Taxi & Limousine Commission's Trip Record Data (opens in a new tab). This dataset contains over 2 million samples of yellow cab rides, and the goal is to predict whether a trip will result in a tip greater than 20% or not.

Ray Data

First, we need to load in the taxi data and transform the raw input into cleaned features that will be passed to the XGBoost model.

Backed by PyArrow, Ray Datasets (opens in a new tab) parallelize the loading and transforming of data and provide a standard way to pass references to data across Ray libraries and applications. Datasets are not intended to replace more general data processing systems. Instead, it serves as a last-mile bridge from ETL pipeline outputs to distributed applications and libraries in Ray.

Key features

  • Flexibility

    Datasets are compatible with a variety of file formats, data sources, and distributed frameworks. They work seamlessly with library integrations like Dask on Ray and can be passed between Ray tasks and actors without copying data.

  • Performance for ML Workloads

    Datasets offer important features like accelerator support, pipelining, and global random shuffles that accelerate ML training and inference workloads. They also support basic distributed data transformations such as map, filter, sort, groupby, and repartition.

  • Persistent Preprocessor

    The Preprocessor primitive captures and stores the transformations applied to convert inputs into features. It is applied during training, tuning, batch prediction, and serving to keep the preprocessing consistent across the pipeline.

  • Built on Ray Core

    Datasets inherits scalability to hundreds of nodes, efficient memory usage, object spilling, and failure handling from Ray Core. Because Datasets are just lists of object references, they can be passed between tasks and actors without needing to make a copy of the data, which is crucial for making data-intensive applications and libraries scalable.

A general pattern for creating a Dataset is configuring a Preprocessor, and passing these into the Trainer for consistent data handling throughout the pipeline.

To transform our raw data into features, we will define a Preprocessor. Ray AIR's Preprocessor (opens in a new tab) captures the data transformation you apply and persists:

  • During training
    • Preprocessor is passed into a Trainer to fit and transform input Datasets.
  • During tuning
    • Each Trial will create its own copy of the Preprocessor, and the fitting and transformation logic will occur once per Trial.
  • During checkpointing
    • The Preprocessor is saved in the Checkpoint if it was passed into the Trainer.
  • During predicting
    • If the Checkpoint contains a Preprocessor, then it will be used to call transform_batch on input batches prior to performing inference.

Ray AIR provides several preprocessors out of the box (opens in a new tab) and also supports the implementation of custom preprocessors (opens in a new tab).

from ray import data
from ray.data.preprocessors import MinMaxScaler
 
# Read Parquet file to Ray Dataset
dataset = data.read_parquet(
    "s3://anyscale-training-data/intro-to-ray-air/nyc_taxi_2021.parquet"
    )
# Split datasets into blocks for parallel preprocessing.
# `num_blocks` should be lower than number of cores in the cluster.
dataset = dataset.repartition(num_blocks=100)
# Create a Preprocessor to transform the data.
preprocessor = MinMaxScaler(columns=["trip_distance", "trip_duration"])
# Apply the Preprocessor to the Dataset.
transformed_dataset = preprocessor.fit_transform(dataset)

We can also do things like calculate the average fare_amount grouped by (opens in a new tab) passenger_count in a distributed manner:

dataset.groupby("passenger_count").mean("fare_amount").show()

We can also visualize the results (using the integration with pandas (opens in a new tab) to generate a histogram view)

# Visualize transformed data.
transformed_df = transformed_data.to_pandas(limit=2704905)
transformed_df.hist("trip_distance")

Key concepts

Dataset

The standard way to load and exchange data in Ray AIR. In AIR, Datasets are used extensively for data loading and transformation. They are meant as a last-mile bridge from ETL pipeline outputs to distributed applications and libraries in Ray.

Preprocessor

Preprocessors are primitives that transform input data into features. They operate on Datasets, making them scalable and compatible with a variety of datasources and dataframe libraries.

Preprocessors persist through various stages of the pipeline:

  • During training to fit and transform input data
  • In each trial of hyperparameter tuning
  • Within a checkpoint
  • On input batches for inference

AIR comes with a collection of built-in preprocessors, and you can also define your own with simple templates (see the user guide (opens in a new tab) for more information).

Ray Train

Here, we set up an XGBoost model to classify taxi rides based on predicted tip amounts using the dataset and preprocessor from the previous step. ML pracitioners tend to run into a few common problems with training models that prompt them to consider distributed solutions:

  1. Training time is too long (opens in a new tab) to be practical.
  2. The data is too large (opens in a new tab) to fit on one machine.
  3. Training many models (opens in a new tab) sequentially doesn't utilize resources efficiently.
  4. The model itself is too large (opens in a new tab) to fit on a single machine.

Ray Train (opens in a new tab) addresses these issues by improving performance through distributed multi-node training.

Ray Train's Trainers integrates well with the rest of the Ray ecosystem:

Ray Train also comes with some useful features

  • Callbacks for early stopping
  • Checkpointing
  • Integration with experiemnt tracking tools like Tensorboard, Weights & Biases, and MLFlow
  • Export mechanisms for models

The general flow is to define the Trainer object and then fit it to the training dataset. To construct a Trainer, you provide three base components:

  • A ScalingConfig which specifies (opens in a new tab) how many parallel training workers and what type of resources (CPUs/GPUs) to use per worker during training; supports seamless scaling across heterogeneous hardware.
  • A dictionary of training and validation sets.
  • The Preprocessor used to transform the Datasets.

Optionally, you can choose to add resume_from_checkpoint which allows you to continue training from a saved checkpoint (opens in a new tab) should the run be interrupted.

This snippet uses a XGBoostTrainer, however, this may be swapped out with any integration (opens in a new tab) or custom-defined Trainer:

from ray.air import session
from ray.data import Dataset
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
 
data.read_parquet("s3://bucket/path")
preprocessor = data.preprocessors.BatchMapper(map_fn)
 
def train_loop_per_worker():
    # get a handle to the worker's assigned Dataset shard.
    data_shard: Dataset = session.get_dataset_shard("train")
 
# manually iterate over the data for 10 epochs
    for _ in range(10):
        for batch in data_shard.iter_batches():
            print("Do some training on batch", batch)
 
 
trainer = XGBoostTrainer(
    label_column="is_big_tip",
    num_boost_round=50,
    scaling_config=ScalingConfig(
        num_workers=5,
        use_gpu=False,
    ),
    params={
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "tree_method": "approx",
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    preprocessor=preprocessor,
)
 
result = trainer.fit()

You can check out the training results from the Result object with the following calls:

# returns last saved checkpoint
result.checkpoint
# returns the `n` best saved checkpoints as configured in `RunConfig.CheckpointConfig`
result.best_checkpoints
# returns the final metrics as reported
result.metrics
# returns an Exception if training failed
result.error

Key Concepts

Checkpoint

Store the full state of the model periodically, so that partially trained models are available and can be used to resume training from an intermediate point, instead of starting from scratch; also allows for the best model to be saved for batch inference later on.

Trainer

Trainers are wrapper classes around third-party training frameworks such as XGBoost, Pytorch, and Tensorflow. They are built to help integrate with Ray Actors (for distribution), Ray Datasets, and Ray Tune.

Ray Tune

Now that you have a baseline XGBoost model trained, you can try to improve performance by running hyperparameter tuning experiments.

💡

Hyperparameter tuning, or optimization, (HPO) (opens in a new tab) is the process of choosing optimal hyperparameters for a machine learning model. Hyperparameters, in contrast to weights learned by the model, are parameters that you set to influence training.

Setting up and executing hyperparameter optimization (HPO) can be expensive in terms of compute resources and runtime with several complexities including:

  • Vast Search Space
    • Your model could have several hyperparameters, each with different data types, ranges, and possible correlations. Sampling good candidates from high-dimensional spaces is difficult.
  • Search Algorithms
    • Choosing hyperparameters strategically requires testing complex search algorithms to achieve good results.
  • Long Runtime
    • Even if you distribute tuning, training complex models in themselves can take a long time to complete per run, so it's best to have an efficiency at every stage in the pipeline.
  • Resource Allocation
    • You must have enough compute resources available to during each trial as to not slow down search because of scheduling mismatches.
  • User Experience
    • Observability tooling for developers like stopping bad runs early, saving intermediate results, restarting from checkpoints, or pausing/resuming runs makes HPO easier.

Ray Tune is a distributed HPO library that addresses all of these topics above to provide a simplified interface for running trials and integrates with popular frameworks such as HyperOpt and Optuna.

The general pattern for using AIR Tuners involves taking in a Trainer, defining a search space, establishing a search algorithm, scheduling trials, and analyzing results.

from ray import tune
from ray.tune.tuner import Tuner, TuneConfig
 
dataset = data.read_csv("s3://bucket/path")
 
param_space = {
    "params": {
        "eta": tune.uniform(0.2, 0.4), # learning rate
        "max_depth": tune.randint(1, 6), # default=6; higher value means more complex tree
        "min_child_weight": tune.uniform(0.8, 1.0), # min sum of weights of all data in a child
    }
}
 
tuner = tune.Tuner(
    trainer=trainer,
    param_space=param_space,
    tune_config=TuneConfig(num_samples=3, metric="train-logloss", mode="min"),
)
result_grid = tuner.fit()

As an aside, Ray Tune will advance a default checkpointing system for hyperparameter tuning. For particularly large models, it may be advisable to set up a CheckpointConfig which defines a checkpointing strategy (opens in a new tab). In particular, you can toggle num_to_keep to avoid saving any fruitless trials to disk.

To set up an AIR Tuner, you must specify:

  • Trainer
    • The training loop from before; support for heterogeneous hardware built-in with each Trainer's ScalingConfig.
  • param_space
    • A set of hyperparameters you wish to tune.
  • TuneConfig
    • Sets the number of experiments, metrics, and whether to minimize or maximize.
  • search_algorithm
    • Optimizes parameter search (optional).
  • scheduler
    • Stops searches early and speed up experiments (optional).

You can probe the ResultGrid for metrics (opens in a new tab) using these calls:

# checks if there have been errors
result_grid.errors
# gets the best result
best_result = result_grid.get_best_result()
# gets the best checkpoint
best_checkpoint = best_result.checkpoint
# gets the best metrics
best_metrics = best_result.metrics

Key Concepts

Tuner

Provides an interface that works with AIR Trainers to perform distributed hyperparameter tuning. You define a set of hyperparameters you wish to tune in a search space, specify a search algorithm, and the Tuner returns its results in a ResultGrid that contains metrics, results, and checkpoints for each trial.

Ray AIR Predictors

Ray AIR Predictors (opens in a new tab) load models from checkpoints (opens in a new tab) generated during training or tuning to perform distributed inference. BatchPredictor is a utility for large scale batch inference (opens in a new tab) that takes in a few components:

  1. Checkpoint
    • A saved model from training or tuning.
  2. Preprocessor
    • Defined earlier to transform input data for training can be reapplied to incoming batches (optional).
  3. Predictor
    • Loads model from checkpoint to perform inference.

Checkpoint and Predictor are passed into each instance of BatchPredictor. Here we take the best checkpoint from tuning and perform offline, or batch, inference on taxi tip data from June 2022

from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor
 
test_dataset = ray.data.read_parquet(
    "s3://anyscale-training-data/intro-to-ray-air/nyc_taxi_2022.parquet"
).drop_columns("is_big_tip")
 
test_dataset = test_dataset.repartition(num_blocks=5)
# Obtain the best checkpointed result from the tuning step.
best_result = result_grid.get_best_result()
# Create a BatchPredictor from the best result and specify a Predictor class.
batch_predictor = BatchPredictor.from_checkpoint(
    checkpoint=best_result.checkpoint, predictor_cls=XGBoostPredictor
)
# Run batch inference.
# Prediction scales across heterogeneous hardware if specified in the ScalingConfig in the Trainer.
predicted_probabilities = batch_predictor.predict(test_dataset)

Inspect the predictions outputted by BatchPredictor by using predicted_probabilities.show().

predicted_probabilities.show(5)

Key Concepts

  • BatchPredictor
    • Loads the best model from a checkpoint to perform batch inference on large-scales or online inference.

Ray Serve

Finally, you may want to serve this taxi tip prediction application to end users, hopefully with a low latency to be maximally useful to drivers on the job. This poses a challenge since machine learning models are compute intensive, and ideally, this model wouldn't be served in isolation, but rather adjacent to business logic or even other ML models.

Ray Serve is a scalable compute layer for serving machine learning models that enables serving individual models or creating composite model pipelines where you can independently deploy, update, and scale individual components.

Serve isn't tied to a specific machine learning library, but rather treats models as ordinary Python code.

Additionally, it allows you to flexibly combine normal Python business logic alongside machine learning models. This makes it possible to build online inference services completely end-to-end:

  • Validate user input.
  • Query a database.
  • Perform inference scalably across multiple ML models.
  • Combine, filter, and validate the output all in the process of handling a single inference request.
from ray import serve
 
# 1: Define a Ray Serve deployment
@serve.deployment(route_prefix="/")
class MyNodelDeployment:
    def __init__(self, msg: str):
        # Initialize model state: could be very large neural net weights.
        self._msg 2 msg
    def __call__(self, request):
        return {"result": self._msg}
 
# 2: Deploy the model.
serve.run(MyModelDeployment.bind(msg="Hello world!"))

To send some test traffic:

import requests
 
sample_input = test_dataset.take(1)
sample_input = dict(sample_input[0])
 
# 3: Query the deployment and print the result.
output = requests.post("http://localhost:8000/rayair", json=[sample_input]).json()
print(output)

Key Concepts

Deployments

A managed group of Ray actors that can be addressed together and will handle requests load-balanced across them.


Resources: