Over-parallelizing with too fine-grained tasks harms speedup
TLDR: Where possible strive to batch tiny smaller Ray tasks into chuncks to reap the benefits of distributing them.
Ray APIs are general and simple to use. As a result, new comers' natural instinct is to parallelize all tasks, including tiny ones, which can incur an overhead over time. In short, if the Ray remote tasks are tiny or miniscule in compute, they may take longer to execute than their serial Python equivalents.
Example
Take this small task:
# Using regular Python task that returns double of the number
def tiny_task(x):
time.sleep(0.00001)
return 2 * x
We can run this as a regular sequential Python task:
start_time = time.time()
results = [tiny_task(x) for x in range(100000)]
end_time = time.time()
print(f"Ordinary funciton call takes {end_time - start_time:.2f} seconds")
Which returns:
Ordinary funciton call takes 151.73 seconds
Instead of this, lets convert this into Ray remote task:
@ray.remote
def remote_tiny_task(x):
time.sleep(0.00001)
return 2 * x
start_time = time.time()
result_ids = [remote_tiny_task.remote(x) for x in range(100000)]
results = ray.get(result_ids)
end_time = time.time()
print(f"Parallelizing Ray tasks takes {end_time - start_time:.2f} seconds")
This returns:
Parallelizing Ray tasks takes 21.24 seconds
Surprisingly, Ray didn't improve the execution time by much. In certain scenarios, Ray program is actually much slower in execution time than the sequential program!
What's going on?
Well, the issue here is that every task invocation has a non-trivial overhead (e.g., scheduling, inter-process communication, updating the system state), and this overhead dominates the actual time it takes to execute the task.
What can we do to remedy it?
One way to mitigate is to make the remote tasks "larger" in order to amortize invocation overhead. This is achieved by aggregating tasks into bigger chunks of 1000.
Better approach: Use batching or chunking
@ray.remote
def mega_work(start, end):
return [tiny_task(x) for x in range(start, end)]
result_ids = []
start_time = time.time()
[result_ids.append(mega_work.remote(x*1000, (x+1)*1000)) for x in range(100)]
# fetch the finihsed results
results = ray.get(result_ids)
end_time = time.time()
print(f"Parallelizing Ray tasks as batches takes {end_time - start_time:.2f} seconds")
Which now returns:
Parallelizing Ray tasks as batches takes 10.54 seconds
That is a huge difference in execution time (about 14x faster than the sequential method)! Breaking or restructuring many small tasks into batches or chunks of large Ray remote tasks, as demonstrated above, achieves significant performance gain.