Cloud & MLOps ☁️
Remote Objects

Remote Objects

Overview

In Ray, tasks and actors create and compute on objects. We refer to these objects as remote objects because they can be stored anywhere in a Ray cluster, and we use object refs to refer to them. Remote objects are cached in Ray's distributed shared-memory object store, and there is one object store per node in the cluster. In the cluster setting, a remote object can live on one or many nodes, independent of who holds the object ref(s). Collectively, these individual object store makes a shared object store across the the Ray Cluster, as shown in the diagram below, depicting Ray archictecture with Ray nodes, each with its own object store. Collectively, it's a shared object store across the cluster.

Remote Objects (opens in a new tab) reside in a distributed shared-memory object store (opens in a new tab).

Ray archictecture with Ray nodes, each with its own object store. Collectively, it's a shared object store across the cluster

Objects are immutable and can be accessed from anywhere on the cluster, as they are stored in the cluster shared memory. An object ref is essentially a pointer or a unique ID that can be used to refer to a remote object without seeing its value. If you're familiar with futures in Python, Java or Scala, Ray object refs are conceptually similar.

In general, small objects are stored in their owner's in-process store (<=100KB), while large objects are stored in the distributed object store. This decision is meant to reduce the memory footprint and resolution time for each object. Note that in the latter case, a placeholder object is stored in the in-process store to indicate the object has been promoted to shared memory.

In the case if there is no space in the shared-memory, objects are spilled over to disk. Ray 1.3+ spills objects to external storage once the object store is full. By default, objects are spilled to Ray's temporary directory in the local filesystem. You can also specify multiple directories for spilling to spread the IO load and disk space usage across multiple physical devices if needed (e.g., SSD devices). To enable object spilling to remote storage (any URI supported by smart_open (opens in a new tab)):

  import json
  import ray
 
  ray.init(
      _system_config={
          "max_io_workers": 4,  # More IO workers for remote storage.
          "min_spilling_size": 100 * 1024 * 1024,  # Spill at least 100MB at a time.
          "object_spilling_config": json.dumps(
              {
                "type": "smart_open",
                "params": {
                  "uri": ["s3://bucket/path1", "s3://bucket/path2", "s3://bucket/path3"],
                },
                "buffer_size": 100 * 1024 * 1024, # Use a 100MB buffer for writes
              },
          )
      },
  )

But the main point here is that shared-memory allows zero-copy access to processes on the same worker node.

Ray object store

Since Ray processes do not share memory space, data transferred between workers and nodes will need to serialized and deserialized. Ray uses the Plasma object store (opens in a new tab) to efficiently transfer objects across different processes and different nodes. Numpy arrays in the object store are shared between workers on the same node (zero-copy deserialization). Ray optimizes for numpy arrays by using Pickle protocol 5 with out-of-band data. Whenever possible, use numpy arrays or Python collections of numpy arrays for maximum performance.

Object References as Futures Pattern

To start, we'll create some python objects and put them in shared memory using the Ray Core APIs (opens in a new tab)

  • ray.put() - put an object in the in-memory object store and return its RefObjectID. Use this RefObjectID to pass object to any remote task or an Actor method call
  • ray.get() - get the values from a remote object or a list of remote objects from the object store

Ray object store

This diagram shows workers in worker nodes using ray.put() to place values and using ray.get() to retrieve them from each node's object store. If the workder node's does not have the value of the ObjectRefID, it'll fetched or copied from the worker's node that created it.

For a demo, let's create a function to return an random tensor shape. We will use this tensor to store in our object store and retrieve it later for processing.

def create_rand_tensor(size: Tuple[int, int]) -> torch.tensor:
    return torch.randn(size=(size), dtype=torch.float)
 
@ray.remote
def transform_rand_tensor(tensor: torch.tensor) -> torch.tensor:
    return torch.mul(tensor, random.randint(2, 10))

Create & Store in Object Store

Next, lets create random tensors and store them in object store

torch.manual_seed(42)
# Create a tensor of shape (X, 50)
tensor_list_obj_ref = [ray.put(create_rand_tensor(((i+1)*25, 50))) for i in range(0, 100)]
tensor_list_obj_ref[:2], len(tensor_list_obj_ref)

Here, we:

  1. create a random tensor
  2. put it in the object store
  3. the final list returned from the comprehension is list of ObjectRefIDs

We can see the ObjectRefIDs in the output:

(
    [
        ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000065e1f505),
        ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000066e1f505)
    ],
    100
)

You can view the object store in the Ray Dashboard (opens in a new tab).

Fetch From the object store

Next, we can fetch the random tensors from the object store by retrieving the values of the object references.

Small objects are resolved by copying them directly from the owner's in-process store. For example, if the owner calls ray.get, the system looks up and deserializes the value from the local in-process store. For larger objects greater than 100KB, they will be stored in the distributed object store.

# Since we got back a list of ObjectRefIDs, index into the first value of the tensor from
# the list of ObectRefIDs
val = ray.get(tensor_list_obj_ref[0])
val.size()

Which returns:

torch.Size([25, 50])

Alternatively, you can fetch all the values of multiple object references by using results = ray.get(tensor_list_obj_ref).

Transform tensors stored in the object store

Let's transform our tensors stored in the object store, put the transformed tensors in the object store (the ray remote task will implicity store it as a returned value), and then fetch the values.

Transform each tensor in the object store with a remote task in our Python comprehension list

transformed_object_list = [transform_rand_tensor.remote(t_obj_ref) for t_obj_ref in tensor_list_obj_ref]
transformed_object_list[:2]

Again, we can fetch all the transformed tensors:

transformed_tensor_values = ray.get(transformed_object_list)

Recap

Ray's object store is a shared memory store spanning a Ray cluster. Workers on each Ray node have their own object store, and they can use simple Ray APIs,ray.put() and ray.get(), to insert values and fetch values of Ray objects created by Ray tasks or Actor methods. Collectively, these individual object stores per node comprise a shared and distributed object store.

In the above exercise, we created random tensors, inserted them into our object store, transformed them, by iterating over each ObjectRefID, sending this ObjectRefID to a Ray task, and then fetching the transformed tensor returned by each Ray remote task.

Passing Objects by Reference

Ray object references can be freely passed around a Ray application. This means that they can be passed as arguments to tasks, actor methods, and even stored in other objects. Objects are tracked via distributed reference counting, and their data is automatically freed once all references to the object are deleted.

First some set-up for the following examples:

# Define a Task
@ray.remote
def echo(x):
    print(f"current value of argument x: {x}")
 
# Define some variables
x = list(range(10))
obj_ref_x = ray.put(x)
y = 25

When we define functions in Python we may, we may need to pass argument values to the function's parameters while calling it. There are two ways to pass argument values to the function's parameters. They are:

  • Pass (Call) by Value
  • Pass (Call) by Reference

Pass-by-value

In pass by value (also known as call by value), the argument passed to the function is the copy of of its original object. If we change or update the value of object inside the function, then original object will not change. If the argument values being passed to the function is large, the copying can take up a lot of time and memory.

Send the object y for example to a task as the top-level argument. The object will be de-referenced automatically once the function completes, so the task only sees its value.

# send y as value argument
echo.remote(y)

Returns:

ObjectRef(298e3e66d66deed9ffffffffffffffffffffffff0100000001000000)
(echo pid=9792) current value of argument x: 25

We can also send an Object Reference to a task as an argument. Note that the echo function will dereference the copy of the object.

echo.remote(obj_ref_x)
ObjectRef(359ec6ce30d3ca2dffffffffffffffffffffffff0100000001000000)
(echo pid=26432) current value of argument x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Fixing “assignment destination is read-only”

Because Ray puts numpy arrays in the object store, when deserialized as arguments in remote functions they will become read-only. For example, the following code snippet will crash:

import ray
import numpy as np
 
@ray.remote
def f(arr):
    # arr = arr.copy()  # Adding a copy will fix the error.
    arr[0] = 1
 
try:
    ray.get(f.remote(np.zeros(100)))
except ray.exceptions.RayTaskError as e:
    print(e)
# ray.exceptions.RayTaskError(ValueError): ray::f()
#   File "test.py", line 6, in f
#     arr[0] = 1
# ValueError: assignment destination is read-only

To avoid this issue, you can manually copy the array at the destination if you need to mutate it (arr = arr.copy()). Note that this is effectively like disabling the zero-copy deserialization feature provided by Ray.

Pass-by-reference

In pass by reference (also known as call by reference), the argument passed to the function's parameter is the original object. If we change the value of object inside the function, the original object will also change.

When a parameter is passed inside a Python list or as any other data structure, the object ref is preserved, meaning it's not de-referenced. The object data is not transferred to the worker when it is passed by reference, until ray.get() is called on the reference.

You can pass by reference in two ways:

  1. as a dictionary .remote({"obj": obj_ref_x})
  2. as list of objRefs .remote([obj_ref_x])
x = list(range(20))
obj_ref_x = ray.put(x)
# Echo will not automaticall de-reference it
echo.remote({"obj": obj_ref_x})
ObjectRef(5b39a414803e3f8effffffffffffffffffffffff0100000001000000)
(echo pid=9792) current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff01000000cae1f505)}

Using a list of object references:

echo.remote([obj_ref_x])

Similarly returns:

ObjectRef(66736a23c9cfb453ffffffffffffffffffffffff0100000001000000)
(echo pid=9792) current value of argument x: [ObjectRef(00ffffffffffffffffffffffffffffffffffffff01000000cae1f505)]

Long-Running Tasks

Sometimes, you may have tasks that are long running, past their expected times due to some problem, maybe blocked on accessing a variable in the object store. How do you exit or terminate it? Use a timeout!

Now let's set a timeout to return early from an attempted access of a remote object that is blocking for too long...

import time
 
@ray.remote
def long_running_function ():
    time.sleep(10)
    return 42

You can control how long you want to wait for the task to finish

%%time
from ray.exceptions import GetTimeoutError
 
obj_ref = long_running_function.remote()
 
try:
    ray.get(obj_ref, timeout=6)
except GetTimeoutError:
    print("`get` timed out")

Returns:

`get` timed out
CPU times: total: 0 ns
Wall time: 6.01 s

Distributed Batch Inference

Batch inference is a common distributed application workload in machine learning. It's a process of using a trained model to generate predictions for a collection of observations. Primarily, it has the following elements:

  • Input dataset: This is a large collection of observations to generate predictions for. The data is usually stored in an external storage system like S3, HDFS or database, across many files.
  • ML model: This is a trained ML model that is usually also stored in an external storage system or in a model store.
  • Predictions: These are the outputs when applying the ML model on observations. Normally, predictions are usually written back to the storage system.

For purpose of this exercise, we make the following provisions:

  • create a dummy model that returns some fake prediction
  • use real-world NYC taxi data to provide large data set for batch inference
  • return the predictions instead of writing it back to the disk

As an example of scaling pattern called Different Data Same Function (DDSF), also known as Distributed Data Parallel (DDP), our function in this diagram is the pretrained model, and the data is split and disributed as shards.

Different Data Same Function (DDSF)

Define a Python closure to load our pretrained model. This model is just a fake model that predicts whether a tip is warranted contigent on the number of fares (2 or more) on collective rides. (Note: This prediction is fake. The real model will invoke model's model.predict(input_data). Yet it suffices for this example.)

def load_trained_model():
    # A fake model that predicts whether tips were given based on number of passengers in the taxi cab.
    def model(batch: pd.DataFrame) -> pd.DataFrame:
        # Some model weights and payload so Ray copies the model in the
        # shared plasma store for tasks scheduled across nodes.
        model.payload = np.arange(10, 100_000, dtype=float)
        model.cls = "regression"
        # give a tip if 2 or more passengers
        predict = batch["passenger_count"] >= 2
        return pd.DataFrame({"score": predict})
 
    return model

Let's define a Ray task that will handle each shard of the NYC taxt data:

@ray.remote
def make_model_batch_predictions(model, shard_path, verbose=False):
    if verbose:
        print(f"Batch inference for shard file: {shard_path}")
    df = pq.read_table(shard_path).to_pandas()
    result = model(df)
    # Return our prediction data frame
    return result

Get the 12 files consisting of NYC data per month

# 12 files, one for each remote task.
input_files = [
    f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
    f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet" for i in range(12)]

Insert model into the object store

ray.put() the model just once to local object store, and then pass the reference to the remote tasks. It would be highly inefficient if you are passing the model itself like make_model_prediction.remote(model, file), which in order to pass the model to remote node will implicitly do a ray.put(model) for each task, potentially overwhelming the local object store and causing out-of-memory.

Instead, we will just pass a reference, and the node where the task is scheduled deference it. This is Ray core API (opens in a new tab) for putting objects into the Ray Plasma store.

# Get the model
model = load_trained_model()
# Put the model object into the shared object store.
model_ref = ray.put(model)
# List for holding all object references returned from the model's predictions
result_refs = []
# Launch all prediction tasks. For each file create a Ray remote task to do a batch inference
for file in input_files:
    # Launch a prediction task by passing model reference and shard file to it.
    result_refs.append(make_model_batch_predictions.remote(model_ref, file))

Fetch the results and check predictions and output size:

results = ray.get(result_refs)
 
for r in results:
    print(f"Predictions dataframe size: {len(r)} | Total score for tips: {r['score'].sum()}")

Returns:

Predictions dataframe size: 141062 | Total score for tips: 46360
Predictions dataframe size: 133932 | Total score for tips: 42175
Predictions dataframe size: 144014 | Total score for tips: 45175
Predictions dataframe size: 143087 | Total score for tips: 45510
Predictions dataframe size: 148108 | Total score for tips: 47713
Predictions dataframe size: 141981 | Total score for tips: 45188
Predictions dataframe size: 136394 | Total score for tips: 43234
Predictions dataframe size: 136999 | Total score for tips: 45142
Predictions dataframe size: 139985 | Total score for tips: 44138
Predictions dataframe size: 156198 | Total score for tips: 49909
Predictions dataframe size: 142893 | Total score for tips: 46112
Predictions dataframe size: 145976 | Total score for tips: 48036

References for deep dives and more about Ray objects:

  1. Serialization (opens in a new tab)
  2. Memory Management (opens in a new tab)
  3. Object Spilling (opens in a new tab)
  4. Fault Tolerance (opens in a new tab)

Resources: