Ray: What is the proper way to submit a job to the cluster?

Created on 2 Aug 2019  路  4Comments  路  Source: ray-project/ray

If I have a local cluster, brought up with ray up command, what is the proper way to start e.g. impala job on the cluster?

  1. ssh into head node and run python my_script.py or
  2. ray submit cluster.yaml --tmux --start --stop my_script.py?

It is not really clear wether running a python script on any note with ray.init(redis_address) is sufficient to distribute the load across the cluster. For some reason my head node always takes all of the work, even though my cluster has 2 workers.

For example, the doc says (https://ray.readthedocs.io/en/latest/using-ray-on-a-cluster.html) that you can run code onthe cluster as follows:

import ray
ray.init(redis_address="<redis-address>")

import time

@ray.remote
def f():
    time.sleep(0.01)
    return ray.services.get_node_ip_address()

# Get a list of the IP addresses of the nodes that have joined the cluster.
set(ray.get([f.remote() for _ in range(1000)]))

Which works. But how do I apply this to RLLib library with impala or appo algorithms?

Most helpful comment

If you're using rllib train, you can pass in --redis-address.

If you're using a python script and you want to use the entire cluster, rllib never autodetects your cluster (because you can have multiple ray clusters at a time). You append ray.init(redis_address="<redis_address>") to your script as @kivo360 mentioned.

ray submit [script] is just a wrapper around ssh user@ipadress python [script].

Hope that helps.

All 4 comments

I was trying to figure this out too. The documentation isn't clear when you want to have the cluster run separate from the rest of the code. I have a couple of hypothesis for connecting to RLLib

As for running RLLib with Async setups, I would guess (don't hold me to it) you could create an agent that connects to the cluster in the same way and specify the number of workers you want the worker to have.

Pseudo-ish code from copying from Policy Server class.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Example of running a policy server. Copy this file for your use case.
To try this out, in two separate shells run:
    $ python cartpole_server.py
    $ python cartpole_client.py
"""

import os
from gym import spaces
import numpy as np

import ray
from ray.rllib.agents.ppo import PPOAgent
from ray.rllib.env.external_env import ExternalEnv
from ray.rllib.utils.policy_server import PolicyServer
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env

SERVER_ADDRESS = "localhost"
SERVER_PORT = 9900
CHECKPOINT_FILE = "last_checkpoint.out"


class CartpoleServing(ExternalEnv):
    def __init__(self):
        ExternalEnv.__init__(
            self, spaces.Discrete(2),
            spaces.Box(low=-10, high=10, shape=(4, ), dtype=np.float32))

    def run(self):
        print("Starting policy server at {}:{}".format(SERVER_ADDRESS,
                                                       SERVER_PORT))
        server = PolicyServer(self, SERVER_ADDRESS, SERVER_PORT)
        server.serve_forever()

if __name__ == "__main__":
    # This would be your point of leverage
    ray.init(redis_address="<redis-address>")
    register_env("srv", lambda _: CartpoleServing())

    # We use PPO since it supports off-policy actions, but you can choose and
    # configure any agent.
    ppo = PPOAgent(
        env="srv",
        config={
            # Use a single process to avoid needing to set up a load balancer
            "num_workers": 16, 
            "num_gpus": 4
            # Configure the agent to run short iterations for debugging
            # "exploration_fraction": 0.01,
            # "learning_starts": 100,
            "model":{
                "use_lstm": True
            },
            "timesteps_per_iteration": 200,
        })

    # Attempt to restore from checkpoint if possible.
    if os.path.exists(CHECKPOINT_FILE):
        checkpoint_path = open(CHECKPOINT_FILE).read()
        print("Restoring from checkpoint path", checkpoint_path)
        ppo.restore(checkpoint_path)

    # Serving and training loop
    while True:
        print(pretty_print(ppo.train()))
        checkpoint_path = ppo.save()
        print("Last checkpoint", checkpoint_path)
        with open(CHECKPOINT_FILE, "w") as f:
            f.write(checkpoint_path)

On the env side you'd label out your code as the following.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Example of querying a policy server. Copy this file for your use case.

To try this out, in two separate shells run:
    $ python cartpole_server.py
    $ python cartpole_client.py
"""

import argparse
import gym

from ray.rllib.utils.policy_client import PolicyClient

parser = argparse.ArgumentParser()
parser.add_argument(
    "--no-train", action="store_true", help="Whether to disable training.")
parser.add_argument(
    "--off-policy",
    action="store_true",
    help="Whether to take random instead of on-policy actions.")
parser.add_argument(
    "--stop-at-reward",
    type=int,
    default=9999,
    help="Stop once the specified reward is reached.")

if __name__ == "__main__":
    args = parser.parse_args()
    env = gym.make("CartPole-v0") # you'd make your env here. SubProcEnv can be used to parallelize the env, or other worker methods provided by ray.
    client = PolicyClient("http://localhost:9900")

    eid = client.start_episode(training_enabled=not args.no_train)
    obs = env.reset()
    rewards = 0

    while True:
        if args.off_policy:
            action = env.action_space.sample()
            client.log_action(eid, obs, action)
        else:
            action = client.get_action(eid, obs)
        obs, reward, done, info = env.step(action)
        rewards += reward
        client.log_returns(eid, reward, info=info)
        if done:
            print("Total reward:", rewards)
            if rewards >= args.stop_at_reward:
                print("Target reward achieved, exiting")
                exit(0)
            rewards = 0
            client.end_episode(eid, obs)
            obs = env.reset()
            eid = client.start_episode(training_enabled=not args.no_train)

Thanks.
I kind of thought APEX & friends would handle this automatically.
Indeed, it seems if I run ApexTrainer with redis_address of the head node, it seems to distribute work. I'm still checking this however...

If you're using rllib train, you can pass in --redis-address.

If you're using a python script and you want to use the entire cluster, rllib never autodetects your cluster (because you can have multiple ray clusters at a time). You append ray.init(redis_address="<redis_address>") to your script as @kivo360 mentioned.

ray submit [script] is just a wrapper around ssh user@ipadress python [script].

Hope that helps.

Yes, thank you.

Was this page helpful?
0 / 5 - 0 ratings