Cloud & MLOps ☁️
Remote Functions as Tasks

Remote Tasks

Ray enables arbitrary Python functions to be executed asynchronously on separate Python workers. These asynchronous Ray functions are called 'tasks'. You can specify task's resource requirements in terms of CPUs, GPUs, and custom resources. These resource requests are used by the cluster scheduler to distribute tasks across the cluster for parallelized execution

Transforming Python code into Ray Tasks, Actors, and Immutable Ray objects:

Transforming Python code into Ray Tasks, Actors, and Immutable Ray objects

Transforming Python function into Ray Tasks:

Transforming Python function into Ray Tasks

Tasks Parallel Pattern

Ray converts decorated functions with @ray.remote into stateless tasks, scheduled anywhere on a Ray node's worker in the cluster.

Where they will be executed on the cluster (and on what node by which worker process), you don't have to worry about its details. All that is taken care for you. Nor do you have to reason about it - all that burden is Ray's job. You simply take your existing Python functions and covert them into distributed stateless Ray Tasks.

Serial vs Parallelism Execution

Serial tasks as regular Python functions are executed in a sequential manner, as shown in the diagram below. If I launch ten tasks, they will run on a single worker, one after the other.

Timeline of sequential tasks, one after the other

Compared to serial execution, a Ray task executes in parallel, scheduled on different workers. The Raylet will schedule these task based on scheduling policies. (opens in a new tab)

Sample timeline with ten tasks running across 4 worker nodes in parallel.

Key Differences

There are a few key differences between an original Python function and the decorated one:

Invocation: The regular version is called with func_name(), whereas the remote Ray version is called with func_name.remote(). Keep this pattern in mind for all Ray remote execution methods.

Mode of execution and return values: A Python func_name() executes synchronously and returns the result of the function, whereas a Ray task func_name.remote() immediately returns an ObjectRef (a future) and then executes the task in the background on a remote worker process.

The result of the future is obtained by calling ray.get(ObjectRef) on the ObjectRef. This is a blocking function.

Examples

Let's look at 2 tasks running serially and then in parallel. For illustration, we'll use a the following tasks:

  1. Computing value of pi using the monte carlo method
  2. Transforming and processing large high-resolution images

Monte Carlo simulation of estimating π\pi

Let's estimate the value of π\pi using a Monte Carlo (opens in a new tab) method. We randomly sample points within a 2x2 square. We can use the proportion of the points that are contained within the unit circle centered at the origin to estimate the ratio of the area of the circle to the area of the square.

Given we know that the true ratio to be π4\frac{\pi}{4}, we can multiply our estimated ratio by 4 to approximate the value of π\pi. The more points that we sample to calculate this approximation, the closer we get to true value of π\pi to required decimal points.

Estimating the value of π by sampling random points that fall into the circle.

First we define:

NUM_SAMPLING_TASKS = os.cpu_count()
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK

Serial Execution

Define a regular function, sampling_task, that computes the number of samples in the circle. This is done by randomly sampling num_samples for x,yx, y between a uniform value of (1,1)(-1, 1). Using the math.hypot (opens in a new tab) function, we compute if it falls within the circle.

We also define a function run_serial to run this serially, by launching NUM_SAMPLING_TASKS serial tasks in a comprehension list, as well as calculate_pi to calculate the value of π\pi by getting all number of samples inside the circle from the sampling tasks and calculate π\pi.

def sampling_task(num_samples: int, task_id: int, verbose=True) -> int:
    num_inside = 0
    for i in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        # check if the point is inside the circle
        if math.hypot(x, y) <= 1:
            num_inside += 1
    if verbose:
        print(f"Task id: {task_id} | Samples in the circle: {num_inside}")
    return num_inside
 
def run_serial(sample_size) -> List[int]:
    results = [sampling_task(sample_size, i+1) for i in range(NUM_SAMPLING_TASKS)]
    return results
 
def calculate_pi(results: List[int]) -> float:
    total_num_inside = sum(results)
    pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
    return pi

Running serially we do:

%%time
results = run_serial(NUM_SAMPLES_PER_TASK)
pi = calculate_pi(results)
print(f"Estimated value of π is: {pi:5f}")

which returns:

Task id: 1 | Samples in the circle: 7855069
Task id: 2 | Samples in the circle: 7854587
Task id: 3 | Samples in the circle: 7855707
Task id: 4 | Samples in the circle: 7853059
Task id: 5 | Samples in the circle: 7855625
Task id: 6 | Samples in the circle: 7854259
Task id: 7 | Samples in the circle: 7855365
Task id: 8 | Samples in the circle: 7852977
Task id: 9 | Samples in the circle: 7854524
Task id: 10 | Samples in the circle: 7853701
Task id: 11 | Samples in the circle: 7855502
Task id: 12 | Samples in the circle: 7854761
Task id: 13 | Samples in the circle: 7853941
Task id: 14 | Samples in the circle: 7852583
Task id: 15 | Samples in the circle: 7851961
Task id: 16 | Samples in the circle: 7853755
Estimated value of π is: 3.141684
CPU times: total: 1min 48s
Wall time: 1min 49s

Parallel Execution

Now we define a function to run sampling_task as a remote Ray task. Since it's decorated with @ray.remote, the task will run on a worker process, tied to a core, on the Ray cluster.

@ray.remote
def sample_task_distribute(sample_size, i) -> object:
    return sampling_task(sample_size, i)
 
def run_disributed(sample_size) -> List[int]:
    # Launch Ray remote tasks in a comprehension list, each returns immediately with a future ObjectRef
    # Use ray.get to fetch the computed value
    # this will block until the ObjectRef is resolved or its value is materialized.
    results = ray.get([
            sample_task_distribute.remote(sample_size, i+1) for i in range(NUM_SAMPLING_TASKS)
        ])
    return results

Running distributed we do:

%%time
results = run_disributed(NUM_SAMPLES_PER_TASK)
pi = calculate_pi(results)
print(f"Estimated value of π is: {pi:5f}")

Which returns:

(sample_task_distribute pid=21064) Task id: 1 | Samples in the circle: 7854549
Estimated value of π is: 3.141554
CPU times: total: 15.6 ms
Wall time: 18.2 s

Almost 6x faster than serial execution!

Image transformation and computation

For this example, we will simulate a compute-intensive task by transforming and computing some operations on large high-resolution images. These tasks are not uncommon in image classification in a DNN for training and transposing images.

PyTorch torchvision.transforms API provides many transformation APIs. We will use a couple here, along with some numpy and torch.tensor operations. Our tasks will perform the following compute-intensive transformations:

  1. Use PIL APIs to blur the image (opens in a new tab) with a filter intensity
  2. Use Torchvision random trivial wide augmentation (opens in a new tab)
  3. Convert images into numpy array and tensors and do numpy and torch tensor operations such as transpose (opens in a new tab), element-wise multiplication (opens in a new tab) with a random integers
  4. Do more exponential tensor power (opens in a new tab) and multiplication with tensors (opens in a new tab)

The goal is to compare execution times running these task serially vs. distributed as a Ray Task.

High resolution images for transformation and computation

First we download thhe images to a local directory:

# borrowed URLs ideas and heavily modified from
# https://analyticsindiamag.com/how-to-run-python-code-concurrently-using-multithreading/
URLS = [
    "https://images.pexels.com/photos/305821/pexels-photo-305821.jpeg",
    "https://images.pexels.com/photos/509922/pexels-photo-509922.jpeg",
    "https://images.pexels.com/photos/325812/pexels-photo-325812.jpeg",
    "https://images.pexels.com/photos/1252814/pexels-photo-1252814.jpeg",
    ...
]
 
def download_images(url: str, data_dir: str) -> None:
    """
    Given a URL and the image data directory, fetch the URL and save it in the data directory
    """
    img_data = requests.get(url).content
    img_name = url.split("/")[4]
    img_name = f"{data_dir}/{img_name}.jpg"
    with open(img_name, "wb+") as f:
        f.write(img_data)
 
# Check if dir exists. If so ignore download.
# Just assume we have done from a prior run
DATA_DIR = Path(os.getcwd() + "/task_images")
 
if not os.path.exists(DATA_DIR):
    os.mkdir(DATA_DIR)
    print(f"downloading images ...")
    for url in tqdm.tqdm(URLS):
        download_images(url, DATA_DIR)

Then, to simulate compute intensive image processing:

def transform_image(img_ref:object, fetch_image=True, verbose=False):
    """
    This is a deliberate compute intensive image transfromation and tensor operation
    to simulate a compute intensive image processing
    """
    import ray
 
    # Only fetch the image from the object store if called serially.
    if fetch_image:
        img = ray.get(img_ref)
    else:
        img = img_ref
    before_shape = img.size
 
    # Make the image blur with specified intensify
    # Use torchvision transformation to augment the image
    img = img.filter(ImageFilter.GaussianBlur(radius=20))
    augmentor = T.TrivialAugmentWide(num_magnitude_bins=31)
    img = augmentor(img)
 
    # Convert image to tensor and transpose
    tensor = torch.tensor(np.asarray(img))
    t_tensor = torch.transpose(tensor, 0, 1)
 
    # compute intensive operations on tensors
    random.seed(42)
    for _ in range(3):
        tensor.pow(3).sum()
        t_tensor.pow(3).sum()
        torch.mul(tensor, random.randint(2, 10))
        torch.mul(t_tensor, random.randint(2, 10))
        torch.mul(tensor, tensor)
        torch.mul(t_tensor, t_tensor)
 
    # Resize to a thumbnail
    img.thumbnail(THUMB_SIZE)
    after_shape = img.size
    if verbose:
        print(f"augmented: shape:{img.size}| image tensor shape:{tensor.size()} transpose shape:{t_tensor.size()}")
 
    return before_shape, after_shape

We define a function to run these transformation tasks serially, on a single node, single core:

def run_serially(img_list_refs: List) -> List[Tuple[int, float]]:
    transform_results = [transform_image(image_ref, fetch_image=True) for image_ref in tqdm.tqdm(img_list_refs)]
    return transform_results

Aswell as a a Ray task to transform, augment and do the same compute intensive tasks on an image and a function to run these transformation tasks distributed:

@ray.remote
def augment_image_distributed(image_ref: object, fetch_image) -> List[object]:
    return transform_image(image_ref, fetch_image=fetch_image)
 
def run_distributed(img_list_refs:List[object]) ->  List[Tuple[int, float]]:
    return ray.get([augment_image_distributed.remote(img, False) for img in tqdm.tqdm(img_list_refs)])

Before we run our tests, we also define a constant:

BATCHES = [10, 20, 30, 40, 50]

Serial Execution

We will iterate through the images with batches of 10 (this can be changed 20 or 25, etc) and process them. To simulate a computer-intensive operation on images, we are doing the tensor transformation and computations described above.

for idx in BATCHES:
    # Use the index to get N number of URLs to images
    image_batch_list_refs = images_list_refs[:idx]
    print(f"\nRunning {len(image_batch_list_refs)} tasks serially....")
 
    # Run each one serially
    start = time.perf_counter()
    serial_results = run_serially(image_batch_list_refs)
    end = time.perf_counter()
    elapsed = end - start
 
    print(f"Serial transformations/computations of {len(image_batch_list_refs)} images: {elapsed:.2f} sec")

Distributed Execution

Populate our Ray object store

Since our images are large, we put them in the Ray Distributed object store (opens in a new tab). - we also cover Ray shared object store in more detail later. Below again is the diagram of workers in worker nodes using ray.put() to store values and using ray.get() to retrieve them from each node's object store.

Diagram of workers in worker nodes using ray.put() to store values and using ray.get() to retrieve them from each node's object store.

We insert all the images into our object store and return the object references:

def insert_into_object_store(img_name:str):
    """
    Insert the image into the object store and return its object reference
    """
    import ray
    img = Image.open(img_name)
    img_ref = ray.put(img)
    return img_ref
 
# Place all images into the object store. Since Ray tasks may be disributed
# across machines, the DATA_DIR may not be present on a worker. However,
# placing them into the Ray distributed objector provides access to any
# remote task scheduled on Ray worker
image_list = list(DATA_DIR.glob("*.jpg"))
images_list_refs = [insert_into_object_store(image) for
                  image in image_list]
images_list_refs[:2]

Which returns 2 example object references as:

[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001e1f505),
 ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000002e1f505)]
Execute distributed tasks

We iterate over BATCHES, launching ray task for each image within the processing batch:

for idx in BATCHES:
    image_batch_list_refs = images_list_refs[:idx]
    print(f"\nRunning {len(image_batch_list_refs)} tasks distributed....")
 
    # Run each one serially
    start = time.perf_counter()
    distributed_results = run_distributed(image_batch_list_refs)
    end = time.perf_counter()
    elapsed = end - start
 
    print(f"Distributed transformations/computations of {len(image_batch_list_refs)} images: {elapsed:.2f} sec")

Comparison

We can clearly observe that the overall execution times by Ray tasks is in order of 3-4x faster 🚅 than serial. Converting an existing serial compute-intensive Python function is as simple as adding the ray.remote(...) operator to your Python function. And Ray will handle all the hard bits: scheduling, execution, scaling, memory management, etc.

As you can see the benefits are tangible in execution times with Ray tasks.

Notes

A side note, is that Ray's benefits are more pronounced for running bigger jobs. Ray can be used for parallel and distributed computing. If you have a smaller job, you may be better of simply using Python's multiprocessing (opens in a new tab) library:

Ray vs. Multiprocessing

On a machine with 48 physical cores, Ray is 9x faster than Python multiprocessing and 28x faster than single-threaded Python. Error bars are depicted, but in some cases are too small to see. For more info, read Parallelizing Python Code (opens in a new tab) as well as 10x Faster Parallel Python Without Python Multiprocessing (opens in a new tab)