Ray: [Docs] Add example on how to use implement checkpointing for Trainable in Ray Tune

Created on 16 Jan 2018  路  5Comments  路  Source: ray-project/ray

At the moment, the documentation is not super helpful for this and is basically like "figure it out from the source code of agents.py". We should have a clean simple example like maybe doing hyperparameter tuning for the learning rate of SGD for optimizing f(\theta) = \norm{\theta}^2 or better even simpler if we have a good idea.

cc @rshin

Most helpful comment

We should be able to fix this as part of https://github.com/ray-project/ray/issues/1364 (in progress). After that change, we can provide an abstract trainable class that can be extended by the user.

@ibarshai for now, the following would fix your example:

 class TestClass(Trainable):
-    def __init__(self, X_train, y_train, config={"max_depth":2}):
+    def __init__(self, config={"max_depth":2}, **kwargs):
         self.config = config
-        self.X_train = X_train
-        self.y_train = y_train
+        self.X_train = X
+        self.y_train = y
         self.clf = RandomForestClassifier(max_depth=self.config['max_depth'], random_state=0)

     def train(self):
@@ -29,7 +29,7 @@
         self.clf.fit(self.X_train, self.y_train)

         return TrainingResult(mean_accuracy = self.clf.score(self.X_train, self.y_train),
-                              timesteps_this_iter=10)
+                              timesteps_this_iter=10, time_total_s=1, time_this_iter_s=1, timesteps_total=10)

     def save(self):
         """Saves the current model state to a checkpoint.

Note that hyperband won't have any benefits unless the RF training can be broken up into multiple training iterations.

We should also probably move ray.init() out of run_experiments(), that would allow X and y above to be broadcasted via ray.put instead of closed over by the class.
Edit: I guess you can't close over ObjectIds, and letting the object get serialized with the class definition would be inefficient, but an alternative would be to define X and y in a separate file that you import from the class. Filed https://github.com/ray-project/ray/issues/1453 for this

All 5 comments

This kind of doc would be much appreciated. I would like to use Hyperband with ray.tune and it's pretty difficult to understand the workflow sufficiently to get everything properly defined. For instance, the example Agent class has a lot going on in the init method. What parts of that are necessary to register the class and run experiments?

I've been trying to work out a dummy example by trying to run 3 experiments with a simple RF classifier and have been pretty unsuccessful:

import numpy as np
import time
import os
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import pickle
import ray
from ray.tune import register_trainable, grid_search, run_experiments
from ray.tune.trainable import Trainable
from ray.tune.result import TrainingResult
from ray.tune.registry import ENV_CREATOR, get_registry

X, y = make_classification(n_samples=1000, n_features=4,
                           n_informative=2, n_redundant=0,
                           random_state=0, shuffle=False)

class TestClass(Trainable):
    def __init__(self, X_train, y_train, config={"max_depth":2}):
        self.config = config
        self.X_train = X_train
        self.y_train = y_train
        self.clf = RandomForestClassifier(max_depth=self.config['max_depth'], random_state=0)

    def train(self):
        """Runs one logical iteration of training.
        Returns:
            A TrainingResult that describes training progress.
        """
        self.clf.fit(self.X_train, self.y_train)

        return TrainingResult(mean_accuracy = self.clf.score(self.X_train, self.y_train),
                              timesteps_this_iter=10)

    def save(self):
        """Saves the current model state to a checkpoint.
        Returns:
            Checkpoint path that may be passed to restore().
        """
        path = "agent_test.pkl"
        with open(path, 'wb') as f:
            pickle.dump(self.clf, f)
        return path

    def restore(self, checkpoint_path):
        """Restores training state from a given model checkpoint.
        These checkpoints are returned from calls to save().
        """
        path = checkpoint_path
        with open(path, 'rb') as f:
            self.clf = pickle.load(f)
        return path

    def stop(self):
        """Releases all resources used by this class."""
        pass  

register_trainable("TestClass", TestClass)

run_experiments({
    "my_experiment": {
        "run": "TestClass",
        "resources": { "cpu": 1, "gpu": 0 },
        "stop": { 'timesteps_total':1 },
        "config": {
            "max_depth": grid_search([2,4,6])
        },
    }
})

Here is my traceback (repeated a few times per experiment):

== Status ==
Using FIFO scheduling algorithm.
Tensorboard logdir: /tmp/ray/my_experiment
 - TestClass_0_max_depth=2: PENDING
 - TestClass_1_max_depth=4: PENDING
 - TestClass_2_max_depth=6: PENDING

Waiting for redis server at 127.0.0.1:34788 to respond...
Waiting for redis server at 127.0.0.1:25231 to respond...
Starting local scheduler with 12 CPUs, 0 GPUs

======================================================================
View the web UI at http://localhost:8891/notebooks/ray_ui70302.ipynb?token=316d2af9c67566bd7d6566a3f1d5442a45057e983e4acee1
======================================================================

Unified logger created with logdir '/tmp/ray/my_experiment/TestClass_0_max_depth=23087u3rt'
Error starting runner, retrying: Traceback (most recent call last):
  File "/home/ilya/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/tune/trial_runner.py", line 143, in _launch_trial
    trial.start()
  File "/home/ilya/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/tune/trial.py", line 112, in start
    self._setup_runner()
  File "/home/ilya/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/tune/trial.py", line 307, in _setup_runner
    logger_creator=lambda config: NoopLogger(config, remote_logdir))
  File "/home/ilya/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/actor.py", line 734, in remote
    kwargs=kwargs)
  File "/home/ilya/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/actor.py", line 578, in _actor_method_call
    args = signature.extend_args(function_signature, args, kwargs)
  File "/home/ilya/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/signature.py", line 187, in extend_args
    .format(keyword_name, function_name))
Exception: The name 'registry' is not a valid keyword argument for the function '__init__'.


...


== Status ==
Using FIFO scheduling algorithm.
Resources used: 3/12 CPUs, 0/0 GPUs
Tensorboard logdir: /tmp/ray/my_experiment
 - TestClass_0_max_depth=2: ERROR
 - TestClass_1_max_depth=4: ERROR
 - TestClass_2_max_depth=6: ERROR

---------------------------------------------------------------------------
TuneError                                 Traceback (most recent call last)
<ipython-input-5-7ed06803d32d> in <module>()
      5         "stop": { 'timesteps_total':1 },
      6         "config": {
----> 7             "max_depth": grid_search([2,4,6])
      8         },
      9     }

~/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/tune/tune.py in run_experiments(experiments, scheduler, **ray_args)
     80     for trial in runner.get_trials():
     81         if trial.status != Trial.TERMINATED:
---> 82             raise TuneError("Trial did not complete", trial)
     83 
     84     return runner.get_trials()

TuneError: ('Trial did not complete', <ray.tune.trial.Trial object at 0x7fe8096e4320>)

If I try to add the registry keyword to my trainable class init, I get the same error. If reproduce what the example Agent class does and add registry=get_registry():

    def __init__(self, X_train, y_train, config={"max_depth":2}, registry=get_registry()):
        self.config = config
        self.registry = registry
        self.X_train = X_train
        self.y_train = y_train
        self.clf = RandomForestClassifier(max_depth=self.config['max_depth'], random_state=0)
RayConnectionError                        Traceback (most recent call last)
<ipython-input-3-c22cabb821de> in <module>()
----> 1 class TestClass(Trainable):
      2     def __init__(self, X_train, y_train, config={"max_depth":2}, registry=get_registry()):
      3         self.config = config
      4         self.registry = registry
      5         self.X_train = X_train

<ipython-input-3-c22cabb821de> in TestClass()
      1 class TestClass(Trainable):
----> 2     def __init__(self, X_train, y_train, config={"max_depth":2}, registry=get_registry()):
      3         self.config = config
      4         self.registry = registry
      5         self.X_train = X_train

~/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/tune/registry.py in get_registry()
     50     """Use this to access the registry. This requires ray to be initialized."""
     51 
---> 52     _default_registry.flush_values_to_object_store()
     53 
     54     # returns a registry copy that doesn't include the hard refs

~/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/tune/registry.py in flush_values_to_object_store(self)
     80         for k, v in self._all_objects.items():
     81             if type(v) != ObjectID:
---> 82                 obj = ray.put(v)
     83                 self._all_objects[k] = obj
     84                 self._refs.append(ray.get(obj))

~/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/worker.py in put(value, worker)
   2187         The object ID assigned to this value.
   2188     """
-> 2189     check_connected(worker)
   2190     with log_span("ray:put", worker=worker):
   2191         check_main_thread()

~/anaconda2/envs/alnn/lib/python3.6/site-packages/ray/worker.py in check_connected(worker)
    956     """
    957     if not worker.connected:
--> 958         raise RayConnectionError("This command cannot be called before Ray "
    959                                  "has been started. You can start Ray with "
    960                                  "'ray.init()'.")

RayConnectionError: This command cannot be called before Ray has been started. You can start Ray with 'ray.init()'.

If I try running ray.init(), when I run the run_experiments() method, I get an error because it also calls ray.init(), which can't be run more than once.

It would be great to get a trivial example set up so that we could see how to set this all up.

We should be able to fix this as part of https://github.com/ray-project/ray/issues/1364 (in progress). After that change, we can provide an abstract trainable class that can be extended by the user.

@ibarshai for now, the following would fix your example:

 class TestClass(Trainable):
-    def __init__(self, X_train, y_train, config={"max_depth":2}):
+    def __init__(self, config={"max_depth":2}, **kwargs):
         self.config = config
-        self.X_train = X_train
-        self.y_train = y_train
+        self.X_train = X
+        self.y_train = y
         self.clf = RandomForestClassifier(max_depth=self.config['max_depth'], random_state=0)

     def train(self):
@@ -29,7 +29,7 @@
         self.clf.fit(self.X_train, self.y_train)

         return TrainingResult(mean_accuracy = self.clf.score(self.X_train, self.y_train),
-                              timesteps_this_iter=10)
+                              timesteps_this_iter=10, time_total_s=1, time_this_iter_s=1, timesteps_total=10)

     def save(self):
         """Saves the current model state to a checkpoint.

Note that hyperband won't have any benefits unless the RF training can be broken up into multiple training iterations.

We should also probably move ray.init() out of run_experiments(), that would allow X and y above to be broadcasted via ray.put instead of closed over by the class.
Edit: I guess you can't close over ObjectIds, and letting the object get serialized with the class definition would be inefficient, but an alternative would be to define X and y in a separate file that you import from the class. Filed https://github.com/ray-project/ray/issues/1453 for this

Thanks for the response @ericl !

I made the change you suggested to the init (and return of the train method) of my Trainable class so it now looks like this:

class TestClass(Trainable):
    def __init__(self, config={"max_depth":2}, **kwargs):
        self.config = config
        self.X_train = X
        self.y_train = y
        self.clf = RandomForestClassifier(max_depth=self.config['max_depth'], random_state=0)

but after I run register_trainable and run_experiments, I seem to be getting the same exception:

Exception: The name 'registry' is not a valid keyword argument for the function '__init__'.

My bad, it should be def __init__(self, config={"max_depth":2}, registry=None, logger_creator=None):

Docs and example have been merged.

Was this page helpful?
0 / 5 - 0 ratings