Cloud & MLOps ☁️
Distributed Python with Ray

Distributed Python with Ray

To gain a better feel for Ray, this section will scale a bare bones version of a common ML task: regression on structured data.

The Data

Say we wanted to perform regression on the California House Prices (opens in a new tab) dataset made available by scikit-learn. We prepare the dataset:

X, y = fetch_california_housing(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=201
)
X.head(n=5)
MedIncHouseAgeAveRoomsAveBedrmsPopulationAveOccupLatitudeLongitude
8.325241.06.9841271.023810322.02.55555637.88-122.23
8.301421.06.2381370.9718802401.02.10984237.86-122.22
7.257452.08.2881361.073446496.02.80226037.85-122.24
5.643152.05.8173521.073059558.02.54794537.85-122.25
3.846252.06.2818531.081081565.02.18146737.85-122.25

Say we wanted to train and score random forest (opens in a new tab) models using mean squared error (opens in a new tab) as the metric.

Sequential implementation

In a lightweight version of hyperparameter tuning, you will be training many models with varying values of n_estimators. First, there is the sequential version of model training where each experiment executes in series one after another, as depicted in the timeline diagram below. Each "task" in this case is training a random forest model.

sequential_timeline

# Set number of models to train
# You will use NUM_MODELS as a benchmark to compare performance
# across sequential and parallel implementations.
NUM_MODELS = 20
 
def train_and_score_model(
    train_set: pd.DataFrame,
    test_set: pd.DataFrame,
    train_labels: pd.Series,
    test_labels: pd.Series,
    n_estimators: int,
) -> tuple[int, float]:
    start_time = time.time()  # measure wall time for single model training
 
    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set, train_labels)
    y_pred = model.predict(test_set)
    score = mean_squared_error(test_labels, y_pred)
 
    time_delta = time.time() - start_time
    print(
        f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds"
    )
 
    return n_estimators, score
 
def run_sequential(n_models: int) -> list[tuple[int, float]]:
    return [
        train_and_score_model(
            train_set=X_train,
            test_set=X_test,
            train_labels=y_train,
            test_labels=y_test,
            n_estimators=8 + 4 * j,
        )
        for j in range(n_models)
    ]

The function train_and_score_model takes data, creates a RandomForestRegressor model, trains it and scores the model on the test set. run_sequential trains n_models sequentially for an increasing number of n_estimators (increasing by 4 for each model, e.g. 8, 12, 16, 20, ...).

Running the sequential model training

%%time
mse_scores = run_sequential(n_models=NUM_MODELS)

Outputs:

n_estimators=8,  mse=0.2983, took: 1.06 seconds
n_estimators=12, mse=0.2826, took: 1.57 seconds
n_estimators=16, mse=0.2761, took: 2.09 seconds
n_estimators=20, mse=0.2716, took: 2.61 seconds
n_estimators=24, mse=0.2694, took: 3.32 seconds
n_estimators=28, mse=0.2686, took: 4.19 seconds
n_estimators=32, mse=0.2662, took: 4.70 seconds
n_estimators=36, mse=0.2663, took: 5.00 seconds
n_estimators=40, mse=0.2635, took: 5.62 seconds
n_estimators=44, mse=0.2622, took: 6.16 seconds
n_estimators=48, mse=0.2616, took: 6.78 seconds
n_estimators=52, mse=0.2609, took: 7.30 seconds
n_estimators=56, mse=0.2614, took: 7.84 seconds
n_estimators=60, mse=0.2608, took: 8.72 seconds
n_estimators=64, mse=0.2613, took: 9.19 seconds
n_estimators=68, mse=0.2615, took: 9.78 seconds
n_estimators=72, mse=0.2617, took: 11.60 seconds
n_estimators=76, mse=0.2614, took: 10.78 seconds
n_estimators=80, mse=0.2607, took: 10.87 seconds
n_estimators=84, mse=0.2601, took: 11.12 seconds
CPU times: total: 1min 54s
Wall time: 2min 10s

Where the wall time (2min 10s) is the total time it took to train all models sequentially.

Parallel implementation

Using Ray, you can take Python code that runs sequentially and transform it into a distributed application with minimal code changes. Parallel and distributed computing are a staple of modern applications. The problem is that taking existing Python code and trying to parallelize or distribute it can mean rewriting existing code, sometimes from scratch. Additionally modern applications have requirements that existing modules like multiprocessing lack. These requirements include:

  • Running the same code on more than one machine
  • Building microservices and actors that have state and can communicate
  • Graceful handling of machine failures and preemption
  • Efficient handling of large objects and numerical data

The Ray library satisfies these requirements and allows you to scale your applications without rewriting them; distributing these training runs with Ray Core allows us to achieve better performance and faster model training. In contrast to the previous approach, this will now utilize all available resources to train these models in parallel. Ray will automatically detect the number of cores on your computer or the amount of resources in a cluster to distribute each defined task.

The diagram below offers an intuition for how tasks are assigned and executed in a parallel approach. It depicts a generic timeline with ten tasks running across 4 workers in parallel, with minor overhead from the scheduler. The scheduler is responsible for managing incoming requests, assigning nodes, and detecting available resources.

distributed_timeline

import ray
 
if ray.is_initialized:
    ray.shutdown()
 
ray.init()
 
 

Running ray.init() starts a fresh Ray cluster. It will also output a dashboard link; an observability tool that provides insight into what Ray is doing via helpful metrics and charts that you can use to monitor the cluster.

2023-09-12 14:23:29,810	INFO worker.py:1612 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265

The Object Store

Object Store

Workers use ray.put() to place objects and use ray.get() to retrieve them from each node's object store. These object stores form the shared distributed memory that makes objects available across a Ray cluster

In a distributed system, object references are pointers to objects in memory. Object references can be used to access objects that are stored on different machines, allowing them to communicate with each other and share data. For example:

X_train_ref = ray.put(X_train)
X_test_ref = ray.put(X_test)
y_train_ref = ray.put(y_train)
y_test_ref = ray.put(y_test)

By placing the training and testing data into Ray's object store, these objects are now available to all remote tasks and actors in the cluster.

print(X_train_ref)

Will return something like:

ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000002000000)

We retrieve X_train by using ray.get() on the object reference e.g. ray.get(X_train_ref).

Implementing Parallel Training

@ray.remote
def train_and_score_model(
    train_set_ref: pd.DataFrame,
    test_set_ref: pd.DataFrame,
    train_labels_ref: pd.Series,
    test_labels_ref: pd.Series,
    n_estimators: int,
) -> tuple[int, float]:
    start_time = time.time()  # measure wall time for single model training
 
    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set_ref, train_labels_ref)
    y_pred = model.predict(test_set_ref)
    score = mean_squared_error(test_labels_ref, y_pred)
 
    time_delta = time.time() - start_time
    print(
        f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds"
    )
 
    return n_estimators, score

Notice that train_and_score_model is the same function as in the sequential example, except here, we add the @ray.remote decorator to specify that this function will be executed in a distributed manner.

def run_parallel(n_models: int) -> list[tuple[int, float]]:
    results_ref = [
        train_and_score_model.remote(
            train_set_ref=X_train_ref,
            test_set_ref=X_test_ref,
            train_labels_ref=y_train_ref,
            test_labels_ref=y_test_ref,
            n_estimators=8 + 4 * j,
        )
        for j in range(n_models)
    ]
    return ray.get(results_ref)

The run_parallel is responsible for parallel model training. Before, we defined run_sequential() to train and score NUM_MODELS. Working from the inside-out, modifying this into run_parallel() involves three steps:

  1. Append a .remote postfix to train_and_score_model.
    • Remember that you specified this function as a remote task in the previous cell. In Ray, you append this suffix to every remote call.
  2. Capture the resulting list of object references in results_ref.
    • Rather than waiting for the results, you immediately receive a list of references to results that are expected to be available in the future. This asychronous (non-blocking) call allows a program to continue executing other operations while the potentially time-consuming operations can be computed in the background.
  3. Access results with ray.get().
    • Once all models have been assigned to workers, call ray.get() on the list of object references results_ref to retrieve completed results. This is a synchronous (blocking) operation because it waits until all computation on objects complete.

For example,

ray.get([ObjectRef, ObjectRef, ObjectRef, ...])

returns list of (n_estimators, score) tuples.

To run parallel model training

%%time
mse_scores = run_parallel(n_models=NUM_MODELS)
(train_and_score_model pid=7580) n_estimators=8, mse=0.2983, took: 1.65 seconds
(train_and_score_model pid=2228) n_estimators=36, mse=0.2663, took: 7.30 seconds [repeated 7x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)
(train_and_score_model pid=1576) n_estimators=68, mse=0.2615, took: 12.72 seconds [repeated 8x across cluster]
CPU times: total: 31.2 ms
Wall time: 20.1 s
(train_and_score_model pid=8580) n_estimators=84, mse=0.2601, took: 13.51 seconds

Here the wall time is 20.1s, a ~6.5x performance gain over the sequential implementation that took 2min 10s. At the end, we disconnect the worker and terminate processes started by ray.init() using:

ray.shutdown()

Conclusion & Summary

You achieve a significant performance gain by introducing parallel model training. You can easily adapt a sequential model training computational job to run in parallel by using the Ray Core API.

With Ray, you can parallelise training without having to implement the orchestration, fault tolerance or autoscaling component that requires specialized knowledge of distributed systems.

Key concepts

  1. Tasks (opens in a new tab). Remote, stateless Python functions
  2. Actors (opens in a new tab). Remote, stateful Python classes
  3. Objects (opens in a new tab). Tasks and actors create and compute on objects that can be stored and accessed anywhere in the cluster; cached in Ray's distributed shared-memory (opens in a new tab) object store

Key API elements

  • ray.init()
    Start Ray runtime and connect to the Ray cluster.
  • @ray.remote
    Decorator that specifies a Python function or class to be executed as a task (remote function) or actor (remote class) in a different process.
  • .remote
    Postfix to the remote functions and classes; remote operations are asynchronous.
  • ray.put()
    Put an object in the in-memory object store; returns an object reference used to pass the object to any remote function or method call.
  • ray.get()
    Get a remote object(s) from the object store by specifying the object reference(s).

Changes

# data loaded from 53 / local disk
data = load_from_53(...)
 
# fit and score model
def fit_and_score_model(data, model, hyperparams):
  model.init(hyperparams)
  model.fit(data['train'])
  score = model.score(data["test"])
  return model, score
 
# train 100 models sequentially
def run_sequentia1():
  results_list = [
  fit_and_score_model(data, model, hyperparams) for _ in range(100)
  ]
  return results_list
 
# run model training
results = run_sequential() 3

Becomes:

# start Ray runtime
ray.init()
 
# data loaded from SS / local disk
data = load_from_53(...)
 
# put data in the object store
data_ref = ray.put(data)
 
# fit and score model
@ray.remote
def fit_and_score_mode1(data, model, hyperparams):
  model.init(hyperparams)
  model.fit(data["train"])
  score = mode1.score(data["test"])
  return model, score
 
# train 100 models in parallel
def run_parallel():
  results_list = [
    fit_and_score_model.remote(data_ref, model, hyperparams) for _ in range(10®)
  ]
  return ray.get(results_1ist)
 
# run model training
results = run_parallel()
 
# shutdown Ray runtime
ray.shutdown()