Remote Tasks
Ray enables arbitrary Python functions to be executed asynchronously on separate Python workers. These asynchronous Ray functions are called 'tasks'. You can specify task's resource requirements in terms of CPUs, GPUs, and custom resources. These resource requests are used by the cluster scheduler to distribute tasks across the cluster for parallelized execution
Transforming Python code into Ray Tasks, Actors, and Immutable Ray objects:
Transforming Python function into Ray Tasks:
Tasks Parallel Pattern
Ray converts decorated functions with @ray.remote
into stateless tasks, scheduled anywhere on a Ray node's worker in the cluster.
Where they will be executed on the cluster (and on what node by which worker process), you don't have to worry about its details. All that is taken care for you. Nor do you have to reason about it - all that burden is Ray's job. You simply take your existing Python functions and covert them into distributed stateless Ray Tasks.
Serial vs Parallelism Execution
Serial tasks as regular Python functions are executed in a sequential manner, as shown in the diagram below. If I launch ten tasks, they will run on a single worker, one after the other.
Compared to serial execution, a Ray task executes in parallel, scheduled on different workers. The Raylet will schedule these task based on scheduling policies. (opens in a new tab)
Key Differences
There are a few key differences between an original Python function and the decorated one:
Invocation: The regular version is called with func_name()
, whereas the remote Ray version is called with func_name.remote()
. Keep this pattern in mind for all Ray remote execution methods.
Mode of execution and return values: A Python func_name()
executes synchronously and returns the result of the function, whereas a Ray task func_name.remote()
immediately returns an ObjectRef
(a future) and then executes the task in the background on a remote worker process.
The result of the future is obtained by calling ray.get(ObjectRef)
on the ObjectRef
. This is a blocking function.
Examples
Let's look at 2 tasks running serially and then in parallel. For illustration, we'll use a the following tasks:
- Computing value of pi using the monte carlo method
- Transforming and processing large high-resolution images
Monte Carlo simulation of estimating
Let's estimate the value of using a Monte Carlo (opens in a new tab) method. We randomly sample points within a 2x2 square. We can use the proportion of the points that are contained within the unit circle centered at the origin to estimate the ratio of the area of the circle to the area of the square.
Given we know that the true ratio to be , we can multiply our estimated ratio by 4 to approximate the value of . The more points that we sample to calculate this approximation, the closer we get to true value of to required decimal points.
First we define:
NUM_SAMPLING_TASKS = os.cpu_count()
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK
Serial Execution
Define a regular function, sampling_task
, that computes the number of samples in the circle. This is done by randomly sampling num_samples
for between a uniform value of . Using the math.hypot (opens in a new tab) function, we compute if it falls within the circle.
We also define a function run_serial
to run this serially, by launching NUM_SAMPLING_TASKS
serial tasks in a comprehension list, as well as calculate_pi
to calculate the value of by getting all number of samples inside the circle from the sampling tasks and calculate .
def sampling_task(num_samples: int, task_id: int, verbose=True) -> int:
num_inside = 0
for i in range(num_samples):
x, y = random.uniform(-1, 1), random.uniform(-1, 1)
# check if the point is inside the circle
if math.hypot(x, y) <= 1:
num_inside += 1
if verbose:
print(f"Task id: {task_id} | Samples in the circle: {num_inside}")
return num_inside
def run_serial(sample_size) -> List[int]:
results = [sampling_task(sample_size, i+1) for i in range(NUM_SAMPLING_TASKS)]
return results
def calculate_pi(results: List[int]) -> float:
total_num_inside = sum(results)
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
return pi
Running serially we do:
%%time
results = run_serial(NUM_SAMPLES_PER_TASK)
pi = calculate_pi(results)
print(f"Estimated value of π is: {pi:5f}")
which returns:
Task id: 1 | Samples in the circle: 7855069
Task id: 2 | Samples in the circle: 7854587
Task id: 3 | Samples in the circle: 7855707
Task id: 4 | Samples in the circle: 7853059
Task id: 5 | Samples in the circle: 7855625
Task id: 6 | Samples in the circle: 7854259
Task id: 7 | Samples in the circle: 7855365
Task id: 8 | Samples in the circle: 7852977
Task id: 9 | Samples in the circle: 7854524
Task id: 10 | Samples in the circle: 7853701
Task id: 11 | Samples in the circle: 7855502
Task id: 12 | Samples in the circle: 7854761
Task id: 13 | Samples in the circle: 7853941
Task id: 14 | Samples in the circle: 7852583
Task id: 15 | Samples in the circle: 7851961
Task id: 16 | Samples in the circle: 7853755
Estimated value of π is: 3.141684
CPU times: total: 1min 48s
Wall time: 1min 49s
Parallel Execution
Now we define a function to run sampling_task
as a remote Ray task. Since it's decorated with @ray.remote
, the task will run on a worker process, tied to a core, on the Ray cluster.
@ray.remote
def sample_task_distribute(sample_size, i) -> object:
return sampling_task(sample_size, i)
def run_disributed(sample_size) -> List[int]:
# Launch Ray remote tasks in a comprehension list, each returns immediately with a future ObjectRef
# Use ray.get to fetch the computed value
# this will block until the ObjectRef is resolved or its value is materialized.
results = ray.get([
sample_task_distribute.remote(sample_size, i+1) for i in range(NUM_SAMPLING_TASKS)
])
return results
Running distributed we do:
%%time
results = run_disributed(NUM_SAMPLES_PER_TASK)
pi = calculate_pi(results)
print(f"Estimated value of π is: {pi:5f}")
Which returns:
(sample_task_distribute pid=21064) Task id: 1 | Samples in the circle: 7854549
Estimated value of π is: 3.141554
CPU times: total: 15.6 ms
Wall time: 18.2 s
Almost 6x faster than serial execution!
Image transformation and computation
For this example, we will simulate a compute-intensive task by transforming and computing some operations on large high-resolution images. These tasks are not uncommon in image classification in a DNN for training and transposing images.
PyTorch torchvision.transforms
API provides many transformation APIs. We will use a couple here, along with some numpy
and torch.tensor
operations. Our tasks will perform the following compute-intensive transformations:
- Use PIL APIs to blur the image (opens in a new tab) with a filter intensity
- Use Torchvision random trivial wide augmentation (opens in a new tab)
- Convert images into numpy array and tensors and do numpy and torch tensor operations such as transpose (opens in a new tab), element-wise multiplication (opens in a new tab) with a random integers
- Do more exponential tensor power (opens in a new tab) and multiplication with tensors (opens in a new tab)
The goal is to compare execution times running these task serially vs. distributed as a Ray Task.
First we download thhe images to a local directory:
# borrowed URLs ideas and heavily modified from
# https://analyticsindiamag.com/how-to-run-python-code-concurrently-using-multithreading/
URLS = [
"https://images.pexels.com/photos/305821/pexels-photo-305821.jpeg",
"https://images.pexels.com/photos/509922/pexels-photo-509922.jpeg",
"https://images.pexels.com/photos/325812/pexels-photo-325812.jpeg",
"https://images.pexels.com/photos/1252814/pexels-photo-1252814.jpeg",
...
]
def download_images(url: str, data_dir: str) -> None:
"""
Given a URL and the image data directory, fetch the URL and save it in the data directory
"""
img_data = requests.get(url).content
img_name = url.split("/")[4]
img_name = f"{data_dir}/{img_name}.jpg"
with open(img_name, "wb+") as f:
f.write(img_data)
# Check if dir exists. If so ignore download.
# Just assume we have done from a prior run
DATA_DIR = Path(os.getcwd() + "/task_images")
if not os.path.exists(DATA_DIR):
os.mkdir(DATA_DIR)
print(f"downloading images ...")
for url in tqdm.tqdm(URLS):
download_images(url, DATA_DIR)
Then, to simulate compute intensive image processing:
def transform_image(img_ref:object, fetch_image=True, verbose=False):
"""
This is a deliberate compute intensive image transfromation and tensor operation
to simulate a compute intensive image processing
"""
import ray
# Only fetch the image from the object store if called serially.
if fetch_image:
img = ray.get(img_ref)
else:
img = img_ref
before_shape = img.size
# Make the image blur with specified intensify
# Use torchvision transformation to augment the image
img = img.filter(ImageFilter.GaussianBlur(radius=20))
augmentor = T.TrivialAugmentWide(num_magnitude_bins=31)
img = augmentor(img)
# Convert image to tensor and transpose
tensor = torch.tensor(np.asarray(img))
t_tensor = torch.transpose(tensor, 0, 1)
# compute intensive operations on tensors
random.seed(42)
for _ in range(3):
tensor.pow(3).sum()
t_tensor.pow(3).sum()
torch.mul(tensor, random.randint(2, 10))
torch.mul(t_tensor, random.randint(2, 10))
torch.mul(tensor, tensor)
torch.mul(t_tensor, t_tensor)
# Resize to a thumbnail
img.thumbnail(THUMB_SIZE)
after_shape = img.size
if verbose:
print(f"augmented: shape:{img.size}| image tensor shape:{tensor.size()} transpose shape:{t_tensor.size()}")
return before_shape, after_shape
We define a function to run these transformation tasks serially, on a single node, single core:
def run_serially(img_list_refs: List) -> List[Tuple[int, float]]:
transform_results = [transform_image(image_ref, fetch_image=True) for image_ref in tqdm.tqdm(img_list_refs)]
return transform_results
Aswell as a a Ray task to transform, augment and do the same compute intensive tasks on an image and a function to run these transformation tasks distributed:
@ray.remote
def augment_image_distributed(image_ref: object, fetch_image) -> List[object]:
return transform_image(image_ref, fetch_image=fetch_image)
def run_distributed(img_list_refs:List[object]) -> List[Tuple[int, float]]:
return ray.get([augment_image_distributed.remote(img, False) for img in tqdm.tqdm(img_list_refs)])
Before we run our tests, we also define a constant:
BATCHES = [10, 20, 30, 40, 50]
Serial Execution
We will iterate through the images with batches of 10 (this can be changed 20 or 25, etc) and process them. To simulate a computer-intensive operation on images, we are doing the tensor transformation and computations described above.
for idx in BATCHES:
# Use the index to get N number of URLs to images
image_batch_list_refs = images_list_refs[:idx]
print(f"\nRunning {len(image_batch_list_refs)} tasks serially....")
# Run each one serially
start = time.perf_counter()
serial_results = run_serially(image_batch_list_refs)
end = time.perf_counter()
elapsed = end - start
print(f"Serial transformations/computations of {len(image_batch_list_refs)} images: {elapsed:.2f} sec")
Distributed Execution
Populate our Ray object store
Since our images are large, we put them in the Ray Distributed object store (opens in a new tab). - we also cover Ray shared object store in more detail later. Below again is the diagram of workers in worker nodes using ray.put()
to store values and using ray.get()
to retrieve them from each node's object store.
We insert all the images into our object store and return the object references:
def insert_into_object_store(img_name:str):
"""
Insert the image into the object store and return its object reference
"""
import ray
img = Image.open(img_name)
img_ref = ray.put(img)
return img_ref
# Place all images into the object store. Since Ray tasks may be disributed
# across machines, the DATA_DIR may not be present on a worker. However,
# placing them into the Ray distributed objector provides access to any
# remote task scheduled on Ray worker
image_list = list(DATA_DIR.glob("*.jpg"))
images_list_refs = [insert_into_object_store(image) for
image in image_list]
images_list_refs[:2]
Which returns 2 example object references as:
[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001e1f505),
ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000002e1f505)]
Execute distributed tasks
We iterate over BATCHES
, launching ray task for each image within the processing batch:
for idx in BATCHES:
image_batch_list_refs = images_list_refs[:idx]
print(f"\nRunning {len(image_batch_list_refs)} tasks distributed....")
# Run each one serially
start = time.perf_counter()
distributed_results = run_distributed(image_batch_list_refs)
end = time.perf_counter()
elapsed = end - start
print(f"Distributed transformations/computations of {len(image_batch_list_refs)} images: {elapsed:.2f} sec")
Comparison
We can clearly observe that the overall execution times by Ray tasks is in order of 3-4x faster 🚅 than serial. Converting an existing serial compute-intensive Python function is as simple as adding the ray.remote(...)
operator to your Python function. And Ray will handle all the hard bits: scheduling, execution, scaling, memory management, etc.
As you can see the benefits are tangible in execution times with Ray tasks.
Notes
A side note, is that Ray's benefits are more pronounced for running bigger jobs. Ray can be used for parallel and distributed computing. If you have a smaller job, you may be better of simply using Python's multiprocessing (opens in a new tab) library:
On a machine with 48 physical cores, Ray is 9x faster than Python multiprocessing and 28x faster than single-threaded Python. Error bars are depicted, but in some cases are too small to see. For more info, read Parallelizing Python Code (opens in a new tab) as well as 10x Faster Parallel Python Without Python Multiprocessing (opens in a new tab)