Cloud & MLOps ☁️
Remote Classes as Actors
Stateful Actors

Remote Stateful Classes

Overview

Actors extend the Ray API (opens in a new tab) from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created or an exsisting worker is used. The methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker. Like tasks, actors support CPU, GPU, and custom resource requirements.

Remote Classes (opens in a new tab) (just as remote tasks) use a @ray.remote decorator on a Python class.

Ray Actor pattern is powerful. They allow you to take a Python class and instantiate it as a stateful microservice that can be queried from other actors and tasks and even other Python applications. Actors can be passed as arguments to other tasks and actors. Ray Actors can also be written as a stateful distributed service

Ray Actor

When you instantiate a remote Actor, a separate worker process is attached to a worker process and becomes an Actor process on that worker node-all for the purpose of running methods called on the actor. Other Ray tasks and actors can invoke its methods on that process, mutating its internal state if desried. Actors can also be terminated manually if needed.

Ray Actor 2

So let's look at some examples of Python classes converted into Ray Actors.

Method tracking for Actors

Say we want to keep track of who invoked a particular method in different Actors. This could be a use case for telemetry data we want to track about what Actors are being used and its respective methods invoked. Or what Actor service's methods are most frequently accessed or used. The diagram shows driver code calling different Actor methods.

Driver code calling different Actor methods

Let's use this actor to track method invocation of a Ray actor's methods. Each Ray actor instance will track how many times its methods were invoked. To do this, we can define a base class ActorCls, and define two super classes ActorClsOne and ActorClsTwo.

class ActorCls:
    def __init__(self, name: str):
        self.name = name
        self.method_calls = {"method": 0}
 
    # The repr name of the actor instance defined by __repr__. For example, this actor will have repr "Actor1"
    def __repr__(self):
        return self.name
 
    def method(self, **args) -> None:
        # Overwrite this method in the subclass
        pass
 
    def get_all_method_calls(self) -> Tuple[str, Dict[str, int]]:
        return self.get_name(), self.method_calls
 
    def get_name(self) -> str:
        return self.name
 
@ray.remote
class ActorClsOne(ActorCls):
 
    def __init__(self, name: str):
        super().__init__(name)
 
    def method(self, **args) -> None:
        # do something with kwargs here
        time.sleep(args["timeout"])
 
        # update the respective counter
        self.method_calls["method"] += 1
 
@ray.remote
class ActorClsTwo(ActorCls):
 
    def __init__(self, name: str):
         super().__init__(name)
 
    def method(self, **args) -> None:
        # do something with kwargs here
        time.sleep(args["timeout"])
 
        # update the respective counter
        self.method_calls["method"] += 1

Make random calls to Actors

This is our driver using the two Actors we defined. It randomly calls each Actor and its respective method. An actor instance is created with class_name.remote(args). For example actor_instance = class_name.remote(args). The args are arguments to the actor class construtor. To invoke an actor's method, you simple use actor_instance.method_name.remote(args). For our case, let's create each instance.

actor_one = ActorClsOne.remote("ActorClsOne")
actor_two = ActorClsTwo.remote("ActorClsTwo")
# A list of Actor classes
CALLERS_NAMES = ["ActorClsOne", "ActorClsTwo"]
# A dictionary of Actor instances keyed by the class name
CALLERS_CLS_DICT = {"ActorClsOne": actor_one,
                    "ActorClsTwo": actor_two}

Iterate over number of classes, and call randomly each super class Actor's method while keeping track locally here for verification.

count_dict = {"ActorClsOne": 0, "ActorClsTwo": 0}
 
for _ in range(len(CALLERS_NAMES)):
    for _ in range(15):
        name = random.choice(CALLERS_NAMES)
        count_dict[name] += 1
        CALLERS_CLS_DICT[name].method.remote(timeout=1, store="mongo_db") if name == "ActorClsOne" else CALLERS_CLS_DICT[name].method.remote(timeout=1.5, store="delta")
 
    print(f"State of counts in this execution: {count_dict}")
    time.sleep(0.5)

Which returns:

State of counts in this execution: {'ActorClsOne': 5, 'ActorClsTwo': 10}
State of counts in this execution: {'ActorClsOne': 13, 'ActorClsTwo': 17}

Fetch the count of all the methods called in each Actor called so far:

print(ray.get([CALLERS_CLS_DICT[name].get_all_method_calls.remote() for name in CALLERS_NAMES]))

Retuns:

    [('ActorClsOne', {'method': 13}), ('ActorClsTwo', {'method': 17})]

Note that we did not have to reason about where and how the actors are scheduled. We did not worry about the socket connection or IP addresses where these actors reside. All that's abstracted away from us. All that's handled by Ray. All we did is write Python code, using Ray core APIs, convert our classes into distributed stateful services!

Look at the Ray Dashboard

We can see Actors running as process on the workers nodes. Also, the Actors page allows us to view more metrics and data on individual Ray Actors:

Ray Dashboard

Overall, we saw how you can use Actors to keep track of how many times its methods were invoked. This could be a useful example for telemetry data if you're interested to obtain the use of Actors deployed as services.

Use Actor to keep track of progress

In Remote Tasks, we explored how to approximate the value of π\pi using only tasks. In this example, we extend it by defining a Ray actor that can be called by our Ray sampling tasks to update progress. The sampling Rays tasks send a message (via method call) to the Ray actor to update progress.

Estimating the value of π by sampling random points that fall into the circle.

Defining our progress Actor

Let's define a Ray actor that does the following:

  • keeps track of each task id and its completed tasks
  • can be called (or sent a message to) by sampling tasks to update progress
@ray.remote
class ProgressPIActor:
    def __init__(self, total_num_samples: int):
        # total number of all the samples for all the tasks
        self.total_num_samples = total_num_samples
        # Dict to keep track of each task id
        self.num_samples_completed_per_task = {}
 
    def report_progress(self, task_id: int, num_samples_completed: int) -> None:
        # Update sample completed for a task id
        self.num_samples_completed_per_task[task_id] = num_samples_completed
 
    def get_progress(self) -> float:
        # Ratio of tasks completed so far and total number of all the samples == num_of_tasks * num_samples
        return (
            sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
        )

Defining the Sampling Task

As before in our task tutorial, we define a Ray task that does the sampling up to num_samples and returns the number of samples that are inside the circle. The frequency_report is the value at which point we want to update the current task_ids progress in our progress actor.

@ray.remote
def sampling_task(num_samples: int, task_id: int,
                  progress_actor: ray.actor.ActorHandle,
                  frequency_report: int = 1_000_000) -> int:
    num_inside = 0
    for i in range(num_samples):
        # x, y coordinates that bounded by the circle's radius
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1
 
        # Report progress every frequency_report of samples.
        if (i + 1) % frequency_report == 0:
            # Send a message or call the actor method.
            # This is asynchronous.
            progress_actor.report_progress.remote(task_id, i + 1)
 
    # Report the final progress.
    progress_actor.report_progress.remote(task_id, num_samples)
 
    # Return the total number of samples inside our circle
    return num_inside

Defining some tunable parameters

These values can be changed for experimentation.

  • NUM_SAMPLING_TASKS - you can scale this depending on CPUs on your cluster.
  • NUM_SAMPLES_PER_TASK - you can increase or decrease the number of samples per task to experiment how it affects the accuracy of π
  • SAMPLE_REPORT_FREQUENCY - report progress after this number has reached in the sampling Ray task
NUM_SAMPLING_TASKS = os.cpu_count()
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK
SAMPLE_REPORT_FREQUENCY = 1_000_000
# Create the progress actor.
progress_actor = ProgressPIActor.remote(TOTAL_NUM_SAMPLES)

Executing Sampling Tasks in parallel

Using comprehension list, we launch NUM_SAMPLING_TASKS as Ray remote tasks, each sampling with NUM_SAMPLES_PER_TASK data points. Notice here, we send our progress report actor as a parameter to each Ray task

# Create and execute all sampling tasks in parallel.
# It returns a list of ObjectRefIDs returned by each task.
# The ObjectRefID contains the value of points inside the circle
results = [
    sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor, frequency_report=SAMPLE_REPORT_FREQUENCY )
    for i in range(NUM_SAMPLING_TASKS)
]

Calling the Progress Actor

While the task are executing asynchronously, let's check how they are progressing using our Ray Actor.

# Query progress periodically.
while True:
    progress = ray.get(progress_actor.get_progress.remote())
    print(f"Progress: {int(progress * 100)}%")
    if progress == 1:
        break
    time.sleep(1)

Which returns:

Progress: 0%
Progress: 0%
Progress: 10%
Progress: 11%
Progress: 20%
Progress: 26%
Progress: 30%
Progress: 38%
Progress: 40%
Progress: 48%
Progress: 50%
Progress: 59%
Progress: 62%
Progress: 70%
Progress: 72%
Progress: 77%
Progress: 82%
Progress: 86%
Progress: 92%
Progress: 97%
Progress: 100%

Calculating π\pi

As before the value of π\pi is the ratio of total_num_inside * 4 / total samples.

# Get all the sampling tasks results.
total_num_inside = sum(ray.get(results))
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
print(f"Estimated value of π is: {pi}")

Which returns:

Estimated value of π is: 3.1417635

Summary

Ray Actors are stateful and their methods can be invoked to pass messages or to alter the internal state of the class. Actors are scheduled on a dedicated Ray node's worker process. As such, all actor's method are executed on that particular worker process. In the above two examples, we saw how you can use Actors to keep track how many times its methods were invoked. This could be a useful example for telemetry data if you're interested to obtain the use of Actors deployed as services. We also demonstrated how you can use Actors to keep progress of certain Ray tasks; in our case, we tracked progress of Ray tasks approximating the value of π\pi.

Next, let's explore how Actors can be used to write more complex distributed applications using Ray Actor Tree pattern.


Resources: