Fault Tolerance
To handle application-level failures, Ray provides mechanisms to catch errors, retry failed code, and handle misbehaving code. Ray also provides several mechanisms to automatically recover from internal system-level failures like node failures (opens in a new tab). In particular, Ray can automatically recover from some failures in the distributed object store.
There are several recommendations to make Ray applications fault tolerant.
Catch Exceptions Manually
First, if the fault tolerance mechanisms provided by Ray don’t work for you, you can always catch exceptions caused by failures and recover manually.
@ray.remote
class Actor:
def read_only(self):
import sys
import random
rand = random.random()
if rand < 0.2:
return 2 / 0
elif rand < 0.3:
sys.exit(1)
return 2
actor = Actor.remote()
# Manually retry the actor task.
while True:
try:
print(ray.get(actor.read_only.remote()))
break
except ZeroDivisionError:
pass
except ray.exceptions.RayActorError:
# Manually restart the actor
actor = Actor.remote()
Avoiding ObjectRef's outliving their Owners
Second, avoid letting an ObjectRef
outlive its owner (opens in a new tab) task or actor (the task or actor that creates the initial ObjectRef
by calling ray.put()
or foo.remote()
).
As long as there are still references to an object, the owner worker of the object keeps running even after the corresponding task or actor finishes.
If the owner worker fails, Ray :ref:cannot recover <fault-tolerance-ownership>
the object automatically for those who try to access the object.
One example of creating such outlived objects is returning ObjectRef
created by ray.put()
from a task:
import ray
# Non-fault tolerant version:
@ray.remote
def a():
x_ref = ray.put(1)
return x_ref
x_ref = ray.get(a.remote())
# Object x outlives its owner task A.
try:
# If owner of x (i.e. the worker process running task A) dies,
# the application can no longer get value of x.
print(ray.get(x_ref))
except ray.exceptions.OwnerDiedError:
pass
In the above example, object x
outlives its owner task a
. If the worker process running task a
fails, calling ray.get
on x_ref
afterwards will result in an OwnerDiedError
exception.
A fault tolerant version is returning x
directly so that it is owned by the driver and it’s only accessed within the lifetime of the driver. If x is lost, Ray can automatically recover it via lineage reconstruction. See Anti-pattern: Returning ray.put() ObjectRefs from a task harms performance and fault tolerance (opens in a new tab) for more details.
# Fault tolerant version:
@ray.remote
def a():
# Here we return the value directly instead of calling ray.put() first.
return 1
# The owner of x is the driver
# so x is accessible and can be auto recovered
# during the entire lifetime of the driver.
x_ref = a.remote()
print(ray.get(x_ref))
Avoid Custom Resource Requriements
Third, avoid using custom resource requirements that can only be satisfied by a particular node. If that particular node fails, the running tasks or actors cannot be retried.
@ray.remote
def b():
return 1
# If the node with ip 127.0.0.3 fails while task b is running,
# Ray cannot retry the task on other nodes.
b.options(resources={"node:127.0.0.3": 1}).remote()
If you prefer running a task on a particular node, you can use the NodeAffinitySchedulingStrategy (opens in a new tab). It allows you to specify the affinity as a soft constraint so even if the target node fails, the task can still be retried on other nodes.
# Prefer running on the particular node specified by node id
# but can also run on other nodes if the target node fails.
b.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(), soft=True
)
).remote()