If I have a local cluster, brought up with ray up command, what is the proper way to start e.g. impala job on the cluster?
python my_script.py orray submit cluster.yaml --tmux --start --stop my_script.py?It is not really clear wether running a python script on any note with ray.init(redis_address) is sufficient to distribute the load across the cluster. For some reason my head node always takes all of the work, even though my cluster has 2 workers.
For example, the doc says (https://ray.readthedocs.io/en/latest/using-ray-on-a-cluster.html) that you can run code onthe cluster as follows:
import ray
ray.init(redis_address="<redis-address>")
import time
@ray.remote
def f():
time.sleep(0.01)
return ray.services.get_node_ip_address()
# Get a list of the IP addresses of the nodes that have joined the cluster.
set(ray.get([f.remote() for _ in range(1000)]))
Which works. But how do I apply this to RLLib library with impala or appo algorithms?
I was trying to figure this out too. The documentation isn't clear when you want to have the cluster run separate from the rest of the code. I have a couple of hypothesis for connecting to RLLib
As for running RLLib with Async setups, I would guess (don't hold me to it) you could create an agent that connects to the cluster in the same way and specify the number of workers you want the worker to have.
Pseudo-ish code from copying from Policy Server class.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Example of running a policy server. Copy this file for your use case.
To try this out, in two separate shells run:
$ python cartpole_server.py
$ python cartpole_client.py
"""
import os
from gym import spaces
import numpy as np
import ray
from ray.rllib.agents.ppo import PPOAgent
from ray.rllib.env.external_env import ExternalEnv
from ray.rllib.utils.policy_server import PolicyServer
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env
SERVER_ADDRESS = "localhost"
SERVER_PORT = 9900
CHECKPOINT_FILE = "last_checkpoint.out"
class CartpoleServing(ExternalEnv):
def __init__(self):
ExternalEnv.__init__(
self, spaces.Discrete(2),
spaces.Box(low=-10, high=10, shape=(4, ), dtype=np.float32))
def run(self):
print("Starting policy server at {}:{}".format(SERVER_ADDRESS,
SERVER_PORT))
server = PolicyServer(self, SERVER_ADDRESS, SERVER_PORT)
server.serve_forever()
if __name__ == "__main__":
# This would be your point of leverage
ray.init(redis_address="<redis-address>")
register_env("srv", lambda _: CartpoleServing())
# We use PPO since it supports off-policy actions, but you can choose and
# configure any agent.
ppo = PPOAgent(
env="srv",
config={
# Use a single process to avoid needing to set up a load balancer
"num_workers": 16,
"num_gpus": 4
# Configure the agent to run short iterations for debugging
# "exploration_fraction": 0.01,
# "learning_starts": 100,
"model":{
"use_lstm": True
},
"timesteps_per_iteration": 200,
})
# Attempt to restore from checkpoint if possible.
if os.path.exists(CHECKPOINT_FILE):
checkpoint_path = open(CHECKPOINT_FILE).read()
print("Restoring from checkpoint path", checkpoint_path)
ppo.restore(checkpoint_path)
# Serving and training loop
while True:
print(pretty_print(ppo.train()))
checkpoint_path = ppo.save()
print("Last checkpoint", checkpoint_path)
with open(CHECKPOINT_FILE, "w") as f:
f.write(checkpoint_path)
On the env side you'd label out your code as the following.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Example of querying a policy server. Copy this file for your use case.
To try this out, in two separate shells run:
$ python cartpole_server.py
$ python cartpole_client.py
"""
import argparse
import gym
from ray.rllib.utils.policy_client import PolicyClient
parser = argparse.ArgumentParser()
parser.add_argument(
"--no-train", action="store_true", help="Whether to disable training.")
parser.add_argument(
"--off-policy",
action="store_true",
help="Whether to take random instead of on-policy actions.")
parser.add_argument(
"--stop-at-reward",
type=int,
default=9999,
help="Stop once the specified reward is reached.")
if __name__ == "__main__":
args = parser.parse_args()
env = gym.make("CartPole-v0") # you'd make your env here. SubProcEnv can be used to parallelize the env, or other worker methods provided by ray.
client = PolicyClient("http://localhost:9900")
eid = client.start_episode(training_enabled=not args.no_train)
obs = env.reset()
rewards = 0
while True:
if args.off_policy:
action = env.action_space.sample()
client.log_action(eid, obs, action)
else:
action = client.get_action(eid, obs)
obs, reward, done, info = env.step(action)
rewards += reward
client.log_returns(eid, reward, info=info)
if done:
print("Total reward:", rewards)
if rewards >= args.stop_at_reward:
print("Target reward achieved, exiting")
exit(0)
rewards = 0
client.end_episode(eid, obs)
obs = env.reset()
eid = client.start_episode(training_enabled=not args.no_train)
Thanks.
I kind of thought APEX & friends would handle this automatically.
Indeed, it seems if I run ApexTrainer with redis_address of the head node, it seems to distribute work. I'm still checking this however...
If you're using rllib train, you can pass in --redis-address.
If you're using a python script and you want to use the entire cluster, rllib never autodetects your cluster (because you can have multiple ray clusters at a time). You append ray.init(redis_address="<redis_address>") to your script as @kivo360 mentioned.
ray submit [script] is just a wrapper around ssh user@ipadress python [script].
Hope that helps.
Yes, thank you.
Most helpful comment
If you're using
rllib train, you can pass in--redis-address.If you're using a python script and you want to use the entire cluster, rllib never autodetects your cluster (because you can have multiple ray clusters at a time). You append
ray.init(redis_address="<redis_address>")to your script as @kivo360 mentioned.ray submit [script]is just a wrapper aroundssh user@ipadress python [script].Hope that helps.