Ray AI Runtime (AIR)
As described earlier, Ray AI Runtime (AIR) is a scalable and unified toolkit for ML applications. Built on top of Ray Core, Ray AIR inherits all the performance and scalability benefits offered by Core while providing a convenient abstraction layer for machine learning. AIR enables simple scaling of individual workloads, end-to-end workflows, and popular ecosystem frameworks, all in just Python.
Ray AIR wraps five native Ray libraries that scale a specific stage of the ML workflow. In addition, Ray AIR brings together an ever-growing ecosystem of integrations with popular machine learning frameworks to create a common interface for development.
For data scientists, AIR can be used to scale individual workloads, and also end-to-end ML applications. For ML Engineers, AIR provides scalable platform abstractions that can be used to easily onboard and integrate tooling from the broader ML ecosystem.
Here, we work through the entire ML workflow from data loading to training and hyperparameter tuning to prediction and serving. Along the way, each section introduces key components of Ray AIR. Here, we briefly go over building a machine learning application that predicts big tips using New York City taxi data.
ML workflow stage | Ray AIR Component | Ray AIR key concept | NYC Taxi Use Case |
---|---|---|---|
Data loading and preprocessing | Ray Data | Preprocessor to load and transform data | Use Preprocessor to load and transform input data. |
Model training | Ray Train | Trainer for supported ML frameworks (Keras, Pytorch and more) | Use Trainer to scale XGBoost model training. |
Hyperparameter tuning | Ray Tune | Tuner for hyperparameter search | Use Tuner for hyperparameter search. |
Batch prediction | Ray AIR Predictor | BatchPredictor to load model from best checkpoint for batch inference | Use BatchPredictor to load model from best checkpoint for batch inference; part of Ray Train. |
Model serving | Ray Serve | PredictorDeployment for online inference | Use PredictorDeployment for online inference. |
For this classification task, we apply a simple XGBoost (opens in a new tab) (a gradient boosted trees framework) model to the June 2021 New York City Taxi & Limousine Commission's Trip Record Data (opens in a new tab). This dataset contains over 2 million samples of yellow cab rides, and the goal is to predict whether a trip will result in a tip greater than 20% or not.
Ray Data
First, we need to load in the taxi data and transform the raw input into cleaned features that will be passed to the XGBoost model.
Backed by PyArrow, Ray Datasets (opens in a new tab) parallelize the loading and transforming of data and provide a standard way to pass references to data across Ray libraries and applications. Datasets are not intended to replace more general data processing systems. Instead, it serves as a last-mile bridge from ETL pipeline outputs to distributed applications and libraries in Ray.
Key features
-
Flexibility
Datasets are compatible with a variety of file formats, data sources, and distributed frameworks. They work seamlessly with library integrations like Dask on Ray and can be passed between Ray tasks and actors without copying data.
-
Performance for ML Workloads
Datasets offer important features like accelerator support, pipelining, and global random shuffles that accelerate ML training and inference workloads. They also support basic distributed data transformations such as map, filter, sort, groupby, and repartition.
-
Persistent Preprocessor
The
Preprocessor
primitive captures and stores the transformations applied to convert inputs into features. It is applied during training, tuning, batch prediction, and serving to keep the preprocessing consistent across the pipeline. -
Built on Ray Core
Datasets inherits scalability to hundreds of nodes, efficient memory usage, object spilling, and failure handling from Ray Core. Because Datasets are just lists of object references, they can be passed between tasks and actors without needing to make a copy of the data, which is crucial for making data-intensive applications and libraries scalable.
A general pattern for creating a Dataset
is configuring a Preprocessor
, and passing these into the Trainer
for consistent data handling throughout the pipeline.
To transform our raw data into features, we will define a Preprocessor
. Ray AIR's Preprocessor
(opens in a new tab) captures the data transformation you apply and persists:
- During training
Preprocessor
is passed into aTrainer
tofit
andtransform
inputDatasets
.
- During tuning
- Each
Trial
will create its own copy of thePreprocessor
, and the fitting and transformation logic will occur once perTrial
.
- Each
- During checkpointing
- The
Preprocessor
is saved in theCheckpoint
if it was passed into theTrainer
.
- The
- During predicting
- If the
Checkpoint
contains aPreprocessor
, then it will be used to calltransform_batch
on input batches prior to performing inference.
- If the
Ray AIR provides several preprocessors out of the box (opens in a new tab) and also supports the implementation of custom preprocessors (opens in a new tab).
from ray import data
from ray.data.preprocessors import MinMaxScaler
# Read Parquet file to Ray Dataset
dataset = data.read_parquet(
"s3://anyscale-training-data/intro-to-ray-air/nyc_taxi_2021.parquet"
)
# Split datasets into blocks for parallel preprocessing.
# `num_blocks` should be lower than number of cores in the cluster.
dataset = dataset.repartition(num_blocks=100)
# Create a Preprocessor to transform the data.
preprocessor = MinMaxScaler(columns=["trip_distance", "trip_duration"])
# Apply the Preprocessor to the Dataset.
transformed_dataset = preprocessor.fit_transform(dataset)
We can also do things like calculate the average fare_amount
grouped by (opens in a new tab) passenger_count
in a distributed manner:
dataset.groupby("passenger_count").mean("fare_amount").show()
We can also visualize the results (using the integration with pandas (opens in a new tab) to generate a histogram view)
# Visualize transformed data.
transformed_df = transformed_data.to_pandas(limit=2704905)
transformed_df.hist("trip_distance")
Key concepts
Dataset
The standard way to load and exchange data in Ray AIR. In AIR, Datasets are used extensively for data loading and transformation. They are meant as a last-mile bridge from ETL pipeline outputs to distributed applications and libraries in Ray.
Preprocessor
Preprocessors are primitives that transform input data into features. They operate on Datasets, making them scalable and compatible with a variety of datasources and dataframe libraries.
Preprocessors persist through various stages of the pipeline:
- During training to fit and transform input data
- In each trial of hyperparameter tuning
- Within a checkpoint
- On input batches for inference
AIR comes with a collection of built-in preprocessors, and you can also define your own with simple templates (see the user guide (opens in a new tab) for more information).
Ray Train
Here, we set up an XGBoost model to classify taxi rides based on predicted tip amounts using the dataset and preprocessor from the previous step. ML pracitioners tend to run into a few common problems with training models that prompt them to consider distributed solutions:
- Training time is too long (opens in a new tab) to be practical.
- The data is too large (opens in a new tab) to fit on one machine.
- Training many models (opens in a new tab) sequentially doesn't utilize resources efficiently.
- The model itself is too large (opens in a new tab) to fit on a single machine.
Ray Train (opens in a new tab) addresses these issues by improving performance through distributed multi-node training.
Ray Train's Trainers
integrates well with the rest of the Ray ecosystem:
- Ray Data
- Enables scalable data loading and preprocessing with Ray
Datasets
andPreprocessors
.
- Enables scalable data loading and preprocessing with Ray
- Ray Tune
- Composes with
Tuners
for distributed hyperparameter tuning.
- Composes with
- Ray AIR Predictor
- As a checkpointed trained model to be applied during inference.
- Popular ML training frameworks
Ray Train also comes with some useful features
- Callbacks for early stopping
- Checkpointing
- Integration with experiemnt tracking tools like Tensorboard, Weights & Biases, and MLFlow
- Export mechanisms for models
The general flow is to define the Trainer
object and then fit it to the training dataset. To construct a Trainer
, you provide three base components:
- A
ScalingConfig
which specifies (opens in a new tab) how many parallel training workers and what type of resources (CPUs/GPUs) to use per worker during training; supports seamless scaling across heterogeneous hardware. - A dictionary of training and validation sets.
- The
Preprocessor
used to transform theDatasets
.
Optionally, you can choose to add resume_from_checkpoint
which allows you to continue training from a saved checkpoint (opens in a new tab) should the run be interrupted.
This snippet uses a XGBoostTrainer
, however, this may be swapped out with any integration (opens in a new tab) or custom-defined Trainer
:
from ray.air import session
from ray.data import Dataset
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
data.read_parquet("s3://bucket/path")
preprocessor = data.preprocessors.BatchMapper(map_fn)
def train_loop_per_worker():
# get a handle to the worker's assigned Dataset shard.
data_shard: Dataset = session.get_dataset_shard("train")
# manually iterate over the data for 10 epochs
for _ in range(10):
for batch in data_shard.iter_batches():
print("Do some training on batch", batch)
trainer = XGBoostTrainer(
label_column="is_big_tip",
num_boost_round=50,
scaling_config=ScalingConfig(
num_workers=5,
use_gpu=False,
),
params={
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
"tree_method": "approx",
},
datasets={"train": train_dataset, "valid": valid_dataset},
preprocessor=preprocessor,
)
result = trainer.fit()
You can check out the training results from the Result
object with the following calls:
# returns last saved checkpoint
result.checkpoint
# returns the `n` best saved checkpoints as configured in `RunConfig.CheckpointConfig`
result.best_checkpoints
# returns the final metrics as reported
result.metrics
# returns an Exception if training failed
result.error
Key Concepts
Checkpoint
Store the full state of the model periodically, so that partially trained models are available and can be used to resume training from an intermediate point, instead of starting from scratch; also allows for the best model to be saved for batch inference later on.
Trainer
Trainers are wrapper classes around third-party training frameworks such as XGBoost, Pytorch, and Tensorflow. They are built to help integrate with Ray Actors (for distribution), Ray Datasets, and Ray Tune.
Ray Tune
Now that you have a baseline XGBoost model trained, you can try to improve performance by running hyperparameter tuning experiments.
Hyperparameter tuning, or optimization, (HPO) (opens in a new tab) is the process of choosing optimal hyperparameters for a machine learning model. Hyperparameters, in contrast to weights learned by the model, are parameters that you set to influence training.
Setting up and executing hyperparameter optimization (HPO) can be expensive in terms of compute resources and runtime with several complexities including:
- Vast Search Space
- Your model could have several hyperparameters, each with different data types, ranges, and possible correlations. Sampling good candidates from high-dimensional spaces is difficult.
- Search Algorithms
- Choosing hyperparameters strategically requires testing complex search algorithms to achieve good results.
- Long Runtime
- Even if you distribute tuning, training complex models in themselves can take a long time to complete per run, so it's best to have an efficiency at every stage in the pipeline.
- Resource Allocation
- You must have enough compute resources available to during each trial as to not slow down search because of scheduling mismatches.
- User Experience
- Observability tooling for developers like stopping bad runs early, saving intermediate results, restarting from checkpoints, or pausing/resuming runs makes HPO easier.
Ray Tune is a distributed HPO library that addresses all of these topics above to provide a simplified interface for running trials and integrates with popular frameworks such as HyperOpt and Optuna.
The general pattern for using AIR Tuners
involves taking in a Trainer
, defining a search space, establishing a search algorithm, scheduling trials, and analyzing results.
from ray import tune
from ray.tune.tuner import Tuner, TuneConfig
dataset = data.read_csv("s3://bucket/path")
param_space = {
"params": {
"eta": tune.uniform(0.2, 0.4), # learning rate
"max_depth": tune.randint(1, 6), # default=6; higher value means more complex tree
"min_child_weight": tune.uniform(0.8, 1.0), # min sum of weights of all data in a child
}
}
tuner = tune.Tuner(
trainer=trainer,
param_space=param_space,
tune_config=TuneConfig(num_samples=3, metric="train-logloss", mode="min"),
)
result_grid = tuner.fit()
As an aside, Ray Tune will advance a default checkpointing system for hyperparameter tuning. For particularly large models, it may be advisable to set up a CheckpointConfig
which defines a checkpointing strategy (opens in a new tab). In particular, you can toggle num_to_keep
to avoid saving any fruitless trials to disk.
To set up an AIR Tuner
, you must specify:
Trainer
- The training loop from before; support for heterogeneous hardware built-in with each Trainer's
ScalingConfig
.
- The training loop from before; support for heterogeneous hardware built-in with each Trainer's
param_space
- A set of hyperparameters you wish to tune.
TuneConfig
- Sets the number of experiments, metrics, and whether to minimize or maximize.
search_algorithm
- Optimizes parameter search (optional).
scheduler
- Stops searches early and speed up experiments (optional).
You can probe the ResultGrid
for metrics (opens in a new tab) using these calls:
# checks if there have been errors
result_grid.errors
# gets the best result
best_result = result_grid.get_best_result()
# gets the best checkpoint
best_checkpoint = best_result.checkpoint
# gets the best metrics
best_metrics = best_result.metrics
Key Concepts
Tuner
Provides an interface that works with AIR Trainers
to perform distributed hyperparameter tuning. You define a set of hyperparameters you wish to tune in a search space, specify a search algorithm, and the Tuner
returns its results in a ResultGrid
that contains metrics, results, and checkpoints for each trial
.
Ray AIR Predictors
Ray AIR Predictors (opens in a new tab) load models from checkpoints (opens in a new tab) generated during training or tuning to perform distributed inference. BatchPredictor
is a utility for large scale batch inference (opens in a new tab) that takes in a few components:
Checkpoint
- A saved model from training or tuning.
Preprocessor
- Defined earlier to transform input data for training can be reapplied to incoming batches (optional).
Predictor
- Loads model from checkpoint to perform inference.
Checkpoint
and Predictor
are passed into each instance of BatchPredictor
. Here we take the best checkpoint from tuning and perform offline, or batch, inference on taxi tip data from June 2022
from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor
test_dataset = ray.data.read_parquet(
"s3://anyscale-training-data/intro-to-ray-air/nyc_taxi_2022.parquet"
).drop_columns("is_big_tip")
test_dataset = test_dataset.repartition(num_blocks=5)
# Obtain the best checkpointed result from the tuning step.
best_result = result_grid.get_best_result()
# Create a BatchPredictor from the best result and specify a Predictor class.
batch_predictor = BatchPredictor.from_checkpoint(
checkpoint=best_result.checkpoint, predictor_cls=XGBoostPredictor
)
# Run batch inference.
# Prediction scales across heterogeneous hardware if specified in the ScalingConfig in the Trainer.
predicted_probabilities = batch_predictor.predict(test_dataset)
Inspect the predictions outputted by BatchPredictor
by using predicted_probabilities.show()
.
predicted_probabilities.show(5)
Key Concepts
BatchPredictor
- Loads the best model from a checkpoint to perform batch inference on large-scales or online inference.
Ray Serve
Finally, you may want to serve this taxi tip prediction application to end users, hopefully with a low latency to be maximally useful to drivers on the job. This poses a challenge since machine learning models are compute intensive, and ideally, this model wouldn't be served in isolation, but rather adjacent to business logic or even other ML models.
Ray Serve is a scalable compute layer for serving machine learning models that enables serving individual models or creating composite model pipelines where you can independently deploy, update, and scale individual components.
Serve isn't tied to a specific machine learning library, but rather treats models as ordinary Python code.
Additionally, it allows you to flexibly combine normal Python business logic alongside machine learning models. This makes it possible to build online inference services completely end-to-end:
- Validate user input.
- Query a database.
- Perform inference scalably across multiple ML models.
- Combine, filter, and validate the output all in the process of handling a single inference request.
from ray import serve
# 1: Define a Ray Serve deployment
@serve.deployment(route_prefix="/")
class MyNodelDeployment:
def __init__(self, msg: str):
# Initialize model state: could be very large neural net weights.
self._msg 2 msg
def __call__(self, request):
return {"result": self._msg}
# 2: Deploy the model.
serve.run(MyModelDeployment.bind(msg="Hello world!"))
To send some test traffic:
import requests
sample_input = test_dataset.take(1)
sample_input = dict(sample_input[0])
# 3: Query the deployment and print the result.
output = requests.post("http://localhost:8000/rayair", json=[sample_input]).json()
print(output)
Key Concepts
Deployments
A managed group of Ray actors that can be addressed together and will handle requests load-balanced across them.
Resources: