Cloud & MLOps ☁️
Remote Classes as Actors
Tree of Actors Pattern

Remote Classes as Actors: Tree of Actors

Ray has a myriad of scaling design patterns (opens in a new tab) for tasks & actors. These patterns allows you to write distributed applications. One of these design pattern, commonly used in Ray libraries to scale workloads, is the Tree of actors (opens in a new tab) pattern.

Tree of Actors Pattern

This pattern is primarily used in Ray libraries Ray Tune (opens in a new tab), Ray Train (opens in a new tab), and RLlib (opens in a new tab) to train models in a parallel or conduct distributed HPO.

In a tree of actors pattern, a collection of workers as Ray actors (or in some cases Ray tasks too), are managed by a supervisor actor. For example, you want to train multiple models, each of a different ML algorithm, at the same time, while being able to inspect its state during its training. As methology to examine simple baseline models rapidly, this pattern helps machine learning engineers to quickly build a set of baseline models for comparison.

Common tree actor pattern with a supervisor launching a supervisor actor

This pattern facilitates Same Data Different Function/Model (SDDF) scaling pattern. Popular in AutoML (opens in a new tab) scenarios, where you may want to train different models, with their respective algorithms, at the same time using the same dataset.

Same data different function/model is a common scaling pattern

Supervisor and worker actor pattern

If we wanted to train multiple small models, say of type linear regression, each with their respective machine learning algorithm, on the same dataset, to create a set of baseline models for comparison. In our case, we use the scikit-learn California house price (opens in a new tab) dataset and use models of type linear regression. We'll train different linear regression models: Decision tree, random forest, and xgboost regressor. Each model is trained and evaluated based on mean square error (opens in a new tab) (MSE).

To see this pattern used in production at scale, read Training One Million Machine Learning Models in Record Time with Ray (opens in a new tab). Another blog that employs this pattern is training many models per a particular feature, such as a zip code or a product SKU. Many Models Batch Training at Scale with Ray Core (opens in a new tab)

MedIncHouseAgeAveRoomsAveBedrmsPopulationAveOccupLatitudeLongitude
8.325241.06.9841271.023810322.02.55555637.88-122.23
8.301421.06.2381370.9718802401.02.10984237.86-122.22
7.257452.08.2881361.073446496.02.80226037.85-122.24
5.643152.05.8173521.073059558.02.54794537.85-122.25
3.846252.06.2818531.081081565.02.18146737.85-122.25

n_samples = 20640, target is numeric and corresponds to the average house value in units of 100k.

Generic model factory utility

This factory generates three different algorithms for linear regression: random forest, decision tree, and xgboost/ It then trains on the same Scikit learn dataset California housing price (opens in a new tab). Each training model returns its MSE score, along with time to train and relevant parameters. Each model can be in a particular state during training. The final state is DONE.

STATES = ["INITIALIZED", "RUNNING", "DONE"]
DECISION_TREE_CONFIGS = {"max_depth": 10, "name": "decision_tree"}
RANDOM_FOREST_CONFIGS = {"n_estimators": 25, "name": "random_forest"}
XGBOOST_CONFIGS = {
    "max_depth": 10,
    "n_estimators": 25,
    "lr": 0.1,
    "eta": 0.3,
    "colsample_bytree": 1,
    "name": "xgboost",
}
 
# dataset
X_data, y_data = fetch_california_housing(return_X_y=True, as_frame=True)
class ActorCls:
    """
    Base class for our Ray Actor workers models
    """
    def __init__(self, configs: Dict[Any, Any]) -> None:
        self.configs = configs
        self.name = configs["name"]
        self.state = STATES[0]
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X_data, y_data, test_size=0.2, random_state=4
        )
 
        self.model = None
 
    def get_name(self) -> str:
        return self.name
 
    def get_state(self) -> str:
        return self.state
 
    def train_and_evaluate_model(self) -> Dict[Any, Any]:
        """
        Overwrite this function in super class
        """
        pass
 
 
@ray.remote
class RFRActor(ActorCls):
    """
    An actor model to train and score the calfornia house data using Random Forest Regressor
    """
    def __init__(self, configs):
        super().__init__(configs)
        self.estimators = configs["n_estimators"]
 
    def train_and_evaluate_model(self) -> Dict[Any, Any]:
        """
        Train the model and evaluate and report MSE
        """
 
        self.model = RandomForestRegressor(
            n_estimators=self.estimators, random_state=42
        )
 
        print(
            f"Start training model {self.name} with estimators: {self.estimators} ..."
        )
 
        start_time = time.time()
        self.model.fit(self.X_train, self.y_train)
        self.state = STATES[1]
        y_pred = self.model.predict(self.X_test)
        score = mean_squared_error(self.y_test, y_pred)
        self.state = STATES[2]
        end_time = time.time()
        print(
            f"End training model {self.name} with estimators: {self.estimators} took: {end_time - start_time:.2f} seconds"
        )
        return {
            "state": self.get_state(),
            "name": self.get_name(),
            "estimators": self.estimators,
            "mse": round(score, 4),
            "time": round(end_time - start_time, 2),
        }
 
 
@ray.remote
class DTActor(ActorCls):
    """
    An actor model to train and score the calfornia house data using Decision Tree Regressor
    """
 
    def __init__(self, configs):
        super().__init__(configs)
        self.max_depth = configs["max_depth"]
 
    def train_and_evaluate_model(self) -> Dict[Any, Any]:
        """
        Train the model and evaluate and report MSE
        """
 
        self.model = DecisionTreeRegressor(max_depth=self.max_depth, random_state=42)
 
        print(
            f"Start training model {self.name} with max depth: { self.max_depth } ..."
        )
 
        start_time = time.time()
        self.model.fit(self.X_train, self.y_train)
        self.state = STATES[1]
        y_pred = self.model.predict(self.X_test)
        score = mean_squared_error(self.y_test, y_pred)
        self.state = STATES[2]
 
        end_time = time.time()
        print(
            f"End training model {self.name} with max_depth tree: {self.max_depth} took: {end_time - start_time:.2f} seconds"
        )
 
        return {
            "state": self.get_state(),
            "name": self.get_name(),
            "max_depth": self.max_depth,
            "mse": round(score, 4),
            "time": round(end_time - start_time, 2),
        }
 
 
@ray.remote
class XGBoostActor(ActorCls):
    """
    An actor model to train and score the calfornia house data using XGBoost Regressor
    """
 
    def __init__(self, configs):
        super().__init__(configs)
 
        self.max_depth = configs["max_depth"]
        self.estimators = configs["n_estimators"]
        self.colsample = configs["colsample_bytree"]
        self.eta = configs["eta"]
        self.lr = configs["lr"]
 
    def train_and_evaluate_model(self) -> Dict[Any, Any]:
        """
        Train the model and evaluate and report MSE
        """
 
        self.model = xgb.XGBRegressor(
            objective="reg:squarederror",
            colsample_bytree=self.colsample,
            eta=self.eta,
            learning_rate=self.lr,
            max_depth=self.max_depth,
            n_estimators=self.estimators,
            random_state=42,
        )
 
        print(
            f"Start training model {self.name} with estimators: {self.estimators} and max depth: { self.max_depth } ..."
        )
        start_time = time.time()
        self.model.fit(self.X_train, self.y_train)
        self.state = STATES[1]
        y_pred = self.model.predict(self.X_test)
        score = mean_squared_error(self.y_test, y_pred)
        self.state = STATES[2]
 
        end_time = time.time()
        print(
            f"End training model {self.name} with estimators: {self.estimators} and max depth: { self.max_depth } and took: {end_time - start_time:.2f}"
        )
 
        return {
            "state": self.get_state(),
            "name": self.get_name(),
            "max_depth": self.max_depth,
            "mse": round(score, 4),
            "estimators": self.estimators,
            "time": round(end_time - start_time, 2),
        }
 
class ModelFactory:
    """
    Mode factory to create different ML models
    """
    MODEL_TYPES = ["random_forest", "decision_tree", "xgboost"]
 
    @staticmethod
    def create_model(model_name: str) -> ray.actor.ActorHandle:
        if model_name not in ModelFactory.MODEL_TYPES:
            raise Exception(f"{model_name} not supported")
        if model_name == "random_forest":
            configs = RANDOM_FOREST_CONFIGS
            return RFRActor.remote(configs)
        elif model_name == "decision_tree":
            configs = DECISION_TREE_CONFIGS
            return DTActor.remote(configs)
        else:
            configs = XGBOOST_CONFIGS
            return XGBoostActor.remote(configs)

We also create an Actor instance as a supervisor. The supervisor creates three worker actors, each with its own respective training model ML algorithm and its training function.

@ray.remote
class Supervisor:
    def __init__(self):
        # Create three Actor Workers, each by its unique model type and
        # their respective training function
        self.worker_models =  [ModelFactory.create_model(name) for name in ModelFactory.MODEL_TYPES]
 
    def work(self):
        # do the train work for each Actor model
        results = [worker_model.train_and_evaluate_model.remote() for worker_model in self.worker_models]
        # Return the final results
        return ray.get(results)

Finally we launch Supervisor's workers by creating the Supervisor actor, launching its workers, and fetching the final results.

supervisor = Supervisor.remote()
results = supervisor.work.remote()
values = ray.get(results)

Results in:

(XGBoostActor pid=23080) Start training model xgboost with estimators: 25 and max depth: 10 ...
(RFRActor pid=24080) Start training model random_forest with estimators: 25 ...
(DTActor pid=11388) Start training model decision_tree with max depth: 10 ...
(DTActor pid=11388) End training model decision_tree with max_depth tree: 10 took: 0.09 seconds
(XGBoostActor pid=23080) End training model xgboost with estimators: 25 and max depth: 10 and took: 0.89

Let for the workers to finish by returning their DONE state

states = []
# Wait for all models to finish
while True:
    for value in values:
        states.append(value["state"])
    result = all('DONE' == e for e in states)
    if result:
        break

Returns:

(RFRActor pid=24080) End training model random_forest with estimators: 25 took: 3.39 seconds

And to view the results:

from operator import itemgetter
sorted_by_mse = sorted(values, key=itemgetter('mse'))
print(f"\nResults from three training models sorted by MSE ascending order:")
pprint(sorted_by_mse)
Results from three training models sorted by MSE ascending order:
[{'estimators': 25,
  'mse': 0.2642,
  'name': 'random_forest',
  'state': 'DONE',
  'time': 3.39},
 {'estimators': 25,
  'max_depth': 10,
  'mse': 0.2711,
  'name': 'xgboost',
  'state': 'DONE',
  'time': 0.89},
 {'max_depth': 10,
  'mse': 0.4007,
  'name': 'decision_tree',
  'state': 'DONE',
  'time': 0.09}]

We can view these Actors (Supervisor, and each model actor: RFRActor, DTActor, XGBoostActor) running as process on the workers nodes in the Ray Dashboard:

Ray Dashboard showing three actors running

Summary

To sum up, we went through implementing a tree actor design pattern, in which a Supervisor launched worker models. Each model, in our case, is a separate linear regression model training on the same data. This tree actor pattern facilitates the Same Data Different Functions (SDDF) scaling pattern, a common machine learning workload.

From all three linear regression models trained in parallel, with the same dataset, XGBoost seems to have faired well with the best MSE result. This gives us an initial baseline of set of models to further experiment with different hyperparameter optimization (opens in a new tab). For that I would turn to Ray Train (opens in a new tab) and Ray Tune (opens in a new tab), which are part of Ray AIR (opens in a new tab).

Actor-based batch inference

In Remote Ray Tasks we covered a use case to parallelize batch inference. In short, we used the Different Data Same Function(DDSF) pattern. Here we will use the same pattern but with Ray Actors and ActorPool (opens in a new tab), which are state-based and preferred method to do batch inference. Primarily, the elements remain the same except for few modifications.

  • Input dataset: This is a large collection of observations to generate predictions for. The data is usually stored in an external storage system like S3, HDFS or database, across many files.
  • ML model: This is a trained ML model that is usually also stored in an external storage system or in a model store.
  • Predictions: These are the outputs when applying the ML model on observations. Normally, predictions are usually written back to the storage system. Unlike tasks doing the predictions, we employ a pool of Actors.

For purpose of this demo, we make the following provisions:

  • create a dummy model that returns some fake prediction
  • use real-world NYC taxi data to provide large data set for batch inference
  • create a pool of actors and submit each shard to the pool.
  • return the predictions instead of writing it back to the disk

As an example of scaling pattern called Different Data Same Function (DDSF), also known as Distributed Data Parallel (DDP) paradigm, our function in this diagraam is the pretrained model and the data is split and distributed as shards.

Different data same function is another scaling pattern

NUM_ACTORS = 6             # You can always increase the number of actors to scale
NUM_SHARD_FILES = 12       # number of shard files you want each each actor to work on
 
# Our load model closure remains the same
def load_trained_model():
    # A fake model that predicts whether tips were given based on number of passengers in the taxi cab.
    def model(batch: pd.DataFrame) -> pd.DataFrame:
        # Some model payload so Ray copies the model in the shared plasma store to tasks scheduled across nodes.
        model.payload = np.arange(10, 10_000, dtype=float)
        #model.payload = np.arange(100, 100_000_000, dtype=float)
        model.cls = "regression"
        # give a tip if 2 or more passengers
        predict = batch["passenger_count"] >= 2
        return pd.DataFrame({"score": predict})
    return model

We also create a Ray Actor that stores a model reference and does the prediction:

@ray.remote
class NYCBatchPredictor:
    def __init__(self, model):
        self.model = model
 
    def predict(self, shard_path):
        # read each shard and convert to pandas
        df = pq.read_table(shard_path).to_pandas()
 
        # do the inference with our model and return the result
        result = self.model(df)
        return result

Next, we want to get our trained model instance and store it into the plasma object store.

model = load_trained_model()
model_ref = ray.put(model)

Fetch our NYC taxi shard files:

# Iterate thorough our NYC files ~ 2GB
input_shard_files = [
    f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
    f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
    for i in range(NUM_SHARD_FILES) ]

To do batch inference, we do the following:

  1. Create five Actor instances, each initialized with the same model reference
  2. Create a pool of five actors

We use the Ray actor pool utility ActorPool (opens in a new tab). Actool Pool API (opens in a new tab) reference package.

actors = [NYCBatchPredictor.remote(model_ref) for _ in range(NUM_ACTORS)]
actors_pool = ActorPool(actors)
 
# Submit each shard to the pool of actors for batch reference
# The API syntax is not dissimilar to Python or Ray Multiprocessor pool APIs
for shard_path in input_shard_files:
    # Submit file shard for prediction to the pool
    actors_pool.submit(lambda actor, shard: actor.predict.remote(shard), shard_path)
 
# Iterate over finised actor's predictions
while actors_pool.has_next():
    r =  actors_pool.get_next()
    print(f"Predictions dataframe size: {len(r)} | Total score for tips: {r['score'].sum()}")

Which returns:

Predictions dataframe size: 141062 | Total score for tips: 46360
Predictions dataframe size: 133932 | Total score for tips: 42175
Predictions dataframe size: 144014 | Total score for tips: 45175
Predictions dataframe size: 143087 | Total score for tips: 45510
Predictions dataframe size: 148108 | Total score for tips: 47713
Predictions dataframe size: 141981 | Total score for tips: 45188
Predictions dataframe size: 136394 | Total score for tips: 43234
Predictions dataframe size: 136999 | Total score for tips: 45142
Predictions dataframe size: 139985 | Total score for tips: 44138
Predictions dataframe size: 156198 | Total score for tips: 49909
Predictions dataframe size: 142893 | Total score for tips: 46112
Predictions dataframe size: 145976 | Total score for tips: 48036

Summary

What we have demonstrated above is an Actor tree design pattern, commonly used in Ray for writing distributed applications. In particular, Ray's native libraries such as Train, Tune, Serve, and RLib and Ray AIR's (opens in a new tab) components use it for distributed training and tuning trials.

Additionally, we implemented a DDSF scaling design pattern using an Actor-based predictor model function, using an ActorPool utility class instead of task. Task-based batch inference has an overhead cost that can be significant if the model size is large, since it has to fetch the model from the driver's plasma store. We can optimize it by using Ray actors, which will fetch the model just once and reuse it for all predictions assigned to the same actor in the pool.


Resources: