I am using a multi-agent setup with PPO and PyTorch. I set up a basic environment and now want to run serving in this environment. This works fine with TensorFlow, but when using PyTorch the exception can't convert CUDA tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first is thrown, which seems to originate from the Torch PPO Policy implementation.
Mock environment issue_world.py:
import numpy as np
import gym
from gym import spaces
from gym.utils import seeding, EzPickle
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class World(MultiAgentEnv, EzPickle):
def __init__(self, env_config):
EzPickle.__init__(self)
self.seed()
self.cfg = env_config
self.observation_space = spaces.Dict({
'id': spaces.Box(0, self.cfg['n_agents'], shape=(1,), dtype=np.int),
'states': spaces.Dict({
i: spaces.Dict({
'map': spaces.Box(0, 2, shape=(42, 42, 2), dtype=np.float32),
'pos': spaces.Box(low=np.array([0,0]), high=np.array([42,42]), dtype=np.int)
}) for i in range(self.cfg['n_agents'])
}),
})
self.action_space = spaces.Discrete(5)
self.reset()
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self.agents = {f'a_{i}': np.array([0, 0]) for i in range(self.cfg['n_agents'])}
self.dones = {key: False for key in self.agents.keys()}
return self.step({key: 0 for key in self.agents.keys()})[0]
def step(self, actions):
states, rewards = {}, {}
for key, action in actions.items():
state = np.zeros((42, 42, 2))
reward = 0
if action == 1:
self.agents[key][0] += 1
reward = 1
states[key] = state
rewards[key] = reward
if self.agents[key][0] > 10:
self.dones[key] = True
self.dones['__all__'] = any(self.dones.values())
get_key_index = lambda key: int(list(self.agents.keys()).index(key))
all_states = {get_key_index(key): {'map': np.zeros((42, 42, 2)), 'pos': self.agents[key]} for key in self.agents.keys()}
final_states = {key: {'id': np.array([get_key_index(key)]), 'states': all_states} for key in actions.keys()}
return final_states, rewards, self.dones, {}
def render(self, mode='human'):
pass
Custom Torch model issue_model.py:
from ray.rllib.utils import try_import_torch
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.misc import normc_initializer, valid_padding, SlimConv2d, SlimFC
from ray.rllib.utils.annotations import override
import numpy as np
torch, nn = try_import_torch()
class AdaptedVisionNetwork(TorchModelV2, nn.Module):
"""Generic vision network."""
def __init__(self, obs_space, action_space, num_outputs, model_config, name):
TorchModelV2.__init__(self, obs_space, action_space, num_outputs, model_config, name)
nn.Module.__init__(self)
self.cfg = model_config['custom_options']
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
filters = [[16, [8, 8], 4], [32, [4, 4], 2], [32, [4, 4], 2]]
layers = []
(w, h, in_channels) = (42, 42, 2)
in_size = [w, h]
for out_channels, kernel, stride in filters[:-1]:
padding, out_size = valid_padding(in_size, kernel, [stride, stride])
layers.append(SlimConv2d(in_channels, out_channels, kernel, stride, padding))
in_channels = out_channels
in_size = out_size
out_channels, kernel, stride = filters[-1]
layers.append(SlimConv2d(in_channels, out_channels, kernel, stride, None))
layers.append(nn.Flatten(1, -1))
self._convs = nn.Sequential(*layers).to(self.device)
self._logits = SlimFC(
128, num_outputs, initializer=nn.init.xavier_uniform_).to(self.device)
self._value_branch = SlimFC(
128, 1, initializer=normc_initializer()).to(self.device)
self._cur_value = None
@override(TorchModelV2)
def forward(self, input_dict, state, seq_lens):
featureMap = self._hidden_layers(input_dict["obs"]['states'][0]['map']) # just for demo purposes
logits = self._logits(featureMap)
self._cur_value = self._value_branch(featureMap).squeeze(1)
return logits, state
@override(TorchModelV2)
def value_function(self):
assert self._cur_value is not None, "must call forward() first"
return self._cur_value
def _hidden_layers(self, obs):
res = self._convs(obs.permute(0, 3, 1, 2)) # switch to channel-major
return res
Model training script train.py (just run for one iteration):
import numpy as np
import gym
from gym import spaces
from issue_world import World
import matplotlib.pyplot as plt
import ray
from ray import tune
from ray.rllib.utils import try_import_torch
from ray.rllib.models import ModelCatalog
import numpy as np
from issue_model import AdaptedVisionNetwork
from issue_model_tf import AdaptedVisionNetwork as AdaptedVisionNetworkTF
if __name__ == '__main__':
ray.init()
ModelCatalog.register_custom_model("vis_torch", AdaptedVisionNetwork)
ModelCatalog.register_custom_model("vis_tf", AdaptedVisionNetworkTF)
tune.run(
"PPO",
checkpoint_freq=1,
config={
"use_pytorch": True,
"env": World,
"num_sgd_iter": 10,
"num_workers": 1,
"num_envs_per_worker": 1,
"num_gpus": 1,
"model": {"custom_model": "vis_torch"},
#"model": {"custom_model": "vis_tf"},
"env_config": {
'world_shape': (42, 42),
'n_agents': 3
}
}
)
Serving script serving_server.py based on this (after the first checkpoint was created in the previous script put the correct checkpoint path in CHECKPOINT_FILE):
import os
from gym import spaces
import numpy as np
import ray
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.env.external_multi_agent_env import ExternalMultiAgentEnv
from ray.rllib.utils.policy_server import PolicyServer
from ray.tune.registry import register_env
from issue_model import AdaptedVisionNetwork
from issue_model_tf import AdaptedVisionNetwork as AdaptedVisionNetworkTF
from ray.rllib.models import ModelCatalog
SERVER_ADDRESS = "localhost"
SERVER_PORT = 9900
CHECKPOINT_FILE = "path/to/checkpoint"
class WorldServing(ExternalMultiAgentEnv):
def __init__(self, config):
self.cfg = config
ExternalMultiAgentEnv.__init__(
self, spaces.Discrete(5),
spaces.Dict({
'id': spaces.Box(0, self.cfg['n_agents'], shape=(1,), dtype=np.int),
'states': spaces.Dict({
i: spaces.Dict({
'map': spaces.Box(0, 2, shape=(42, 42, 2), dtype=np.float32),
'pos': spaces.Box(low=np.array([0,0]), high=np.array([42,42]), dtype=np.int)
}) for i in range(self.cfg['n_agents'])
}),
}))
def run(self):
print("Starting policy server at {}:{}".format(SERVER_ADDRESS,
SERVER_PORT))
server = PolicyServer(self, SERVER_ADDRESS, SERVER_PORT)
server.serve_forever()
if __name__ == "__main__":
ray.init()
register_env("srv", lambda config: WorldServing(config))
ModelCatalog.register_custom_model("vis_torch", AdaptedVisionNetwork)
ModelCatalog.register_custom_model("vis_tf", AdaptedVisionNetworkTF)
ppo = PPOTrainer(
env="srv",
config={
"use_pytorch": True,
"num_workers": 0,
"timesteps_per_iteration": 200,
"env_config": {
'world_shape': (42, 42),
'n_agents': 3
},
"model": {"custom_model": "vis_torch"}
#"model": {"custom_model": "vis_tf"}
})
ppo.restore(CHECKPOINT_FILE)
print("restored")
while True:
ppo.train()
And the client script serving_client.py based on this (run after the last script has successfully started):
import gym
from issue_world import World
from ray.rllib.utils.policy_client import PolicyClient
if __name__ == "__main__":
env = World({
'world_shape': (42, 42),
'n_agents': 3
})
client = PolicyClient("http://localhost:9900")
eid = client.start_episode(training_enabled=False)
obs = env.reset()
while True:
action = client.get_action(eid, obs)
print(action)
obs, reward, done, info = env.step(action)
client.log_returns(eid, reward, info=info)
if done:
client.end_episode(eid, obs)
obs = env.reset()
eid = client.start_episode(training_enabled=False)
Interestingly, the client receives exactly one action and then the server interrupts the connection. The output of the server window is
Traceback (most recent call last):
File "issue_serving_server.py", line 71, in <module>
ppo.train()
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/trainer.py", line 497, in train
raise e
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/trainer.py", line 483, in train
result = Trainable.train(self)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/tune/trainable.py", line 254, in train
result = self._train()
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/trainer_template.py", line 133, in _train
fetches = self.optimizer.step()
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/optimizers/sync_samples_optimizer.py", line 62, in step
samples.append(self.workers.local_worker().sample())
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/rollout_worker.py", line 488, in sample
batches = [self.input_reader.next()]
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 52, in next
batches = [self.get_data()]
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 95, in get_data
item = next(self.rollout_provider)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 315, in _env_runner
soft_horizon, no_done_at_end)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 461, in _process_observations
episode.batch_builder.postprocess_batch_so_far(episode)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sample_batch_builder.py", line 152, in postprocess_batch_so_far
pre_batch, other_batches, episode)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/policy/torch_policy_template.py", line 109, in postprocess_trajectory
episode)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/ppo/ppo_tf_policy.py", line 191, in postprocess_ppo_gae
use_gae=policy.config["use_gae"])
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/postprocessing.py", line 45, in compute_advantages
traj[key] = np.stack(rollout[key])
File "<__array_function__ internals>", line 6, in stack
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/numpy/core/shape_base.py", line 420, in stack
arrays = [asanyarray(arr) for arr in arrays]
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/numpy/core/shape_base.py", line 420, in <listcomp>
arrays = [asanyarray(arr) for arr in arrays]
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/numpy/core/_asarray.py", line 138, in asanyarray
return array(a, dtype, copy=False, order=order, subok=True)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/torch/tensor.py", line 486, in __array__
return self.numpy()
TypeError: can't convert CUDA tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.
For reference, this is the TensorFlow model issue_model_tf.py where the issue does not occur:
from ray import tune
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.utils import try_import_tf
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel
tf = try_import_tf()
class AdaptedVisionNetwork(TFModelV2):
"""Custom model for policy gradient algorithms."""
def __init__(self, obs_space, action_space, num_outputs, model_config,
name):
super(AdaptedVisionNetwork, self).__init__(obs_space, action_space,
num_outputs, model_config, name)
def dense(inp, channels):
d = tf.keras.layers.Dense(
channels,
kernel_initializer=normc_initializer(1.0))(inp)
r = tf.keras.layers.Dropout(0.2)(d)
return tf.keras.layers.Activation("relu")(r)
def conv(inp, size, strides, channels):
c = tf.keras.layers.Conv2D(
filters=channels,
kernel_size=size,
strides=strides,
padding="same",
kernel_initializer=normc_initializer(1.0))(inp)
bn = tf.keras.layers.BatchNormalization()(c)
return tf.keras.layers.Activation("relu")(bn)
self.inputs = tf.keras.layers.Input(shape=(42, 42, 2), name="observations")
s = conv(self.inputs, 8, 4, 16)
s = conv(s, 4, 2, 32)
f = tf.keras.layers.Flatten()(s)
d = dense(f, 256)
layer_out = tf.keras.layers.Dense(
num_outputs,
name="my_out",
activation=None,
kernel_initializer=normc_initializer(0.01))(d)
value_out = tf.keras.layers.Dense(
1,
name="value_out",
activation=None,
kernel_initializer=normc_initializer(0.01))(d)
self.base_model = tf.keras.Model(self.inputs, [layer_out, value_out])
self.register_variables(self.base_model.variables)
def forward(self, input_dict, state, seq_lens):
model_out, self._value_out = self.base_model(input_dict["obs"]["states"][0]) # just for demonstration purposes
return model_out, state
def value_function(self):
return tf.reshape(self._value_out, [-1])
Thanks for filing this issue @janblumenkamp!
Could you try this PR and see, whether this fixes your problem?
https://github.com/ray-project/ray/pull/7445
Thanks for the quick PR! Unfortunately it does not work since the torch tensors in the stats dict are contained by numpy arrays (e.g. {"t": np.array([torch.tensor([0])], dtype=object)}), which are not traversed by the dm-tree package :( If, instead of the numpy array of objects a standard Python list was used, this would work. I tried checking if the instance if item in the mapping function is a numpy array and if so return it as a list, but apparently map_structure does not allow modifyng structures on the go.
Is there any reason why stats uses a numpy array of objects instead of a list? Can this be changed?
Cool, thanks for checking. Yeah, this doesn't look right. We shouldn't use np.array of objects. There is no reason for that. I'll check again.
Ok, so I ran the entire example you provided and after starting the client, I do get lots of actions displayed on the screen. Are you sure you have the latest version of ray's (pip install -U ray ray[rllib] plus pip install -U [nightly wheel for your platform]?
I'm on MacOS:
pytorch==1.4.0
python==3.7.6
numpy==1.17.4
ray==0.9.0dev0
Server output:
...
127.0.0.1 - - [04/Mar/2020 15:41:21] "POST / HTTP/1.1" 200 -
127.0.0.1 - - [04/Mar/2020 15:41:21] "POST / HTTP/1.1" 200 -
127.0.0.1 - - [04/Mar/2020 15:41:21] "POST / HTTP/1.1" 200 -
...
Client output:
...
{'a_0': 3, 'a_1': 0, 'a_2': 4}
{'a_0': 2, 'a_1': 4, 'a_2': 0}
{'a_0': 3, 'a_1': 1, 'a_2': 3}
{'a_0': 4, 'a_1': 3, 'a_2': 1}
{'a_0': 2, 'a_1': 1, 'a_2': 3}
{'a_0': 4, 'a_1': 2, 'a_2': 1}
{'a_0': 0, 'a_1': 1, 'a_2': 0}
{'a_0': 4, 'a_1': 4, 'a_2': 3}
{'a_0': 1, 'a_1': 2, 'a_2': 0}
{'a_0': 0, 'a_1': 2, 'a_2': 4}
{'a_0': 0, 'a_1': 0, 'a_2': 2}
{'a_0': 2, 'a_1': 4, 'a_2': 1}
{'a_0': 1, 'a_1': 3, 'a_2': 4}
{'a_0': 0, 'a_1': 2, 'a_2': 0}
{'a_0': 0, 'a_1': 1, 'a_2': 1}
{'a_0': 0, 'a_1': 4, 'a_2': 2}
{'a_0': 0, 'a_1': 2, 'a_2': 2}
{'a_0': 4, 'a_1': 4, 'a_2': 0}
So what I meant was: After you upgrade to the latest RLlib, add this PR from today:
https://github.com/ray-project/ray/pull/7445
I may have used an RLLib version that was one or two days old, sorry.
With the latest RLlib (not adding your pull request) in a newly set up virtualenv, I get a segmentation fault:
127.0.0.1 - - [04/Mar/2020 16:20:00] "POST / HTTP/1.1" 200 -
*** Aborted at 1583338800 (unix time) try "date -d @1583338800" if you are using GNU date ***
PC: @ 0x0 (unknown)
*** SIGSEGV (@0x1) received by PID 30454 (TID 0x7f70d6e60740) from PID 1; stack trace: ***
@ 0x7f70d68c2f20 (unknown)
@ 0x7f70d6a125a1 (unknown)
@ 0x7f70d68e14d3 _IO_vfprintf
@ 0x7f70d690c910 vsnprintf
@ 0x7f707f176154 torch::formatMessage()
@ 0x7f707f176476 torch::TypeError::TypeError()
@ 0x7f707f469cee torch::utils::(anonymous namespace)::new_with_tensor()
@ 0x7f707f46cff7 torch::utils::legacy_tensor_ctor()
@ 0x7f707f2ba496 THPVariable_pynew()
@ 0x551b15 (unknown)
@ 0x5aa6ec _PyObject_FastCallKeywords
@ 0x50abb3 (unknown)
@ 0x50c5b9 _PyEval_EvalFrameDefault
@ 0x508245 (unknown)
@ 0x50a080 (unknown)
@ 0x50aa7d (unknown)
@ 0x50c5b9 _PyEval_EvalFrameDefault
@ 0x508245 (unknown)
@ 0x509642 _PyFunction_FastCallDict
@ 0x595311 (unknown)
@ 0x54a6ff (unknown)
@ 0x551b81 (unknown)
@ 0x5a067e PyObject_Call
@ 0x50d966 _PyEval_EvalFrameDefault
@ 0x508245 (unknown)
@ 0x50a080 (unknown)
@ 0x50aa7d (unknown)
@ 0x50c5b9 _PyEval_EvalFrameDefault
@ 0x508245 (unknown)
@ 0x50a080 (unknown)
@ 0x50aa7d (unknown)
@ 0x50d390 _PyEval_EvalFrameDefault
Segmentation fault (core dumped)
But I doubt it has anything to do with this issue. I will try Python 3.7 then.
Ok, please keep us posted. Yeah, this went in just a few days ago, so it's important you really use the very latest build.
Nope, no matter what I try, I keep getting this exception (the one from my previous comment) with the most recent builds both with Python 3.6 and 3.7. Could someone else who uses Linux verify that? Maybe something else is wrong with my setup?
Confirmed on my setup; I _built_ Ray 'recently' (March 3, 2020) at /ray :
user@hostname$ uname -a
Linux hostname 5.3.0-40-generic #32~18.04.1-Ubuntu SMP Mon Feb 3 14:05:59 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
user@hostname$ python --version
Python 3.7.4
Python package versions:
from importlib import import_module
import yaml
version_lookup = {'torch': '', 'tensorflow': '', 'numpy': '', 'ray': ''}
for module in version_lookup.keys():
imprt = import_module(module)
version_lookup[module] = imprt.__version__
print(yaml.dump(version_lookup))
numpy: 1.16.6
ray: 0.9.0.dev0
tensorflow: 1.14.1
torch: 1.4.0
Configuration/setup:
import ray
from ray.rllib.agents.ppo.ddppo import DDPPOTrainer, DEFAULT_CONFIG
import yaml
config = DEFAULT_CONFIG.copy()
with open('/ray/rllib/tuned_examples/atari-ddppo.yaml','r') as f:
tuned_example = yaml.safe_load(f)
for key, val in tuned_example['atari-ddppo']['config'].items():
config[key] = val
config['num_workers'] = 1
Ray calls:
ray.init(address="hostIP:6379", redis_password='redis_password', ignore_reinit_error=True)
agent = DDPPOTrainer(config, 'BreakoutNoFrameskip-v4')
agent.train()
Ray errors:
2020-03-05 08:57:26,745 INFO trainer.py:423 -- Tip: set 'eager': true or the --eager flag to enable TensorFlow eager execution
2020-03-05 08:57:26,759 INFO trainer.py:580 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
---------------------------------------------------------------------------
RayTaskError(TypeError) Traceback (most recent call last)
<ipython-input-2-b53e91f7fd27> in <module>
1 ray.init(address="hostIP:6379", redis_password='redis_password', ignore_reinit_error=True)
2 agent = DDPPOTrainer(config, 'BreakoutNoFrameskip-v4')
----> 3 agent.train()
/ray/python/ray/rllib/agents/trainer.py in train(self)
495 "continue training without the failed worker, set "
496 "`'ignore_worker_failures': True`.")
--> 497 raise e
498 except Exception as e:
499 time.sleep(0.5) # allow logs messages to propagate
/ray/python/ray/rllib/agents/trainer.py in train(self)
484 for _ in range(1 + MAX_WORKER_FAILURE_RETRIES):
485 try:
--> 486 result = Trainable.train(self)
487 except RayError as e:
488 if self.config["ignore_worker_failures"]:
/ray/python/ray/tune/trainable.py in train(self)
252 """
253 start = time.time()
--> 254 result = self._train()
255 assert isinstance(result, dict), "_train() needs to return a dict."
256
/ray/python/ray/rllib/agents/trainer_template.py in _train(self)
137 start = time.time()
138 while True:
--> 139 fetches = self.optimizer.step()
140 if after_optimizer_step:
141 after_optimizer_step(self, fetches)
/ray/python/ray/rllib/optimizers/torch_distributed_data_parallel_optimizer.py in step(self)
64 self.expected_batch_size, self.num_sgd_iter,
65 self.sgd_minibatch_size, self.standardize_fields)
---> 66 for w in self.workers.remote_workers()
67 ])
68 for info, count in results:
/ray/python/ray/worker.py in get(object_ids, timeout)
1502 worker.core_worker.dump_object_store_memory_usage()
1503 if isinstance(value, RayTaskError):
-> 1504 raise value.as_instanceof_cause()
1505 else:
1506 raise value
RayTaskError(TypeError): ray::RolloutWorker.sample_and_learn() (pid=4004, ip=IP)
File "python/ray/_raylet.pyx", line 448, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 426, in ray._raylet.execute_task.function_executor
File "/ray/python/ray/rllib/evaluation/rollout_worker.py", line 652, in sample_and_learn
batch = self.sample()
File "/ray/python/ray/rllib/evaluation/rollout_worker.py", line 489, in sample
batches = [self.input_reader.next()]
File "/ray/python/ray/rllib/evaluation/sampler.py", line 53, in next
batches = [self.get_data()]
File "/ray/python/ray/rllib/evaluation/sampler.py", line 96, in get_data
item = next(self.rollout_provider)
File "/ray/python/ray/rllib/evaluation/sampler.py", line 316, in _env_runner
soft_horizon, no_done_at_end)
File "/ray/python/ray/rllib/evaluation/sampler.py", line 462, in _process_observations
episode.batch_builder.postprocess_batch_so_far(episode)
File "/ray/python/ray/rllib/evaluation/sample_batch_builder.py", line 153, in postprocess_batch_so_far
pre_batch, other_batches, episode)
File "/ray/python/ray/rllib/policy/torch_policy_template.py", line 109, in postprocess_trajectory
episode)
File "/ray/python/ray/rllib/agents/ppo/ppo_tf_policy.py", line 191, in postprocess_ppo_gae
use_gae=policy.config["use_gae"])
File "/ray/python/ray/rllib/evaluation/postprocessing.py", line 45, in compute_advantages
traj[key] = np.stack(rollout[key])
File "/usr/local/lib/python3.7/site-packages/numpy/core/shape_base.py", line 410, in stack
arrays = [asanyarray(arr) for arr in arrays]
File "/usr/local/lib/python3.7/site-packages/numpy/core/shape_base.py", line 410, in <listcomp>
arrays = [asanyarray(arr) for arr in arrays]
File "/usr/local/lib/python3.7/site-packages/numpy/core/numeric.py", line 591, in asanyarray
return array(a, dtype, copy=False, order=order, subok=True)
File "/usr/local/lib/python3.7/site-packages/torch/tensor.py", line 486, in __array__
return self.numpy()
TypeError: can't convert CUDA tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.
Could you post the contents of your lines 100-120 in rllib/policy/torch_policy_template.py?
Do you have calls to convert_to_non_torch_type in there?
In that function, we are moving the tensors to cpu(), then numpy'ize them:
def convert_to_non_torch_type(stats):
# The mapping function used to numpyize torch Tensors.
def mapping(item):
if isinstance(item, torch.Tensor):
return item.cpu().item() if len(item.size()) == 0 else \
item.cpu().numpy()
else:
return item
return tree.map_structure(mapping, stats)
The above changes were done in PR #7445 and have not been merged into master yet!
https://github.com/ray-project/ray/pull/7445
The most recent commit in master that does not throw the segmentation fault for me is fb1c1e2d27348b139bf1b686a1e81b94049d5190. If I apply your patch to that commit, the exception persists since the 'action_prob' and the 'action_logp' key consists of a numpy array of tensors, which can't be traversed by the tree library. I can verify if your PR fixes the problems for later commits when the segmentation fault is not thrown anymore.
@sven1977 -- Here are my lines 100-120 (maybe not what you're looking for):
Call:
user@hostname:/ray/rllib/policy$ head -n 120 torch_policy_template.py | tail -n 20
Result:
episode=None):
if not postprocess_fn:
return sample_batch
# Do all post-processing always with no_grad().
# Not using this here will introduce a memory leak (issue #6962).
with torch.no_grad():
return postprocess_fn(self, sample_batch, other_agent_batches,
episode)
@override(TorchPolicy)
def extra_grad_process(self):
if extra_grad_process_fn:
return extra_grad_process_fn(self)
else:
return TorchPolicy.extra_grad_process(self)
@override(TorchPolicy)
def extra_action_out(self, input_dict, state_batches, model,
action_dist=None):
OTOH... Call:
user@hostname:/ray/rllib/policy$ grep -n convert_to_non_torch_type torch_policy_template.py
Result:
8:from ray.rllib.utils.torch_ops import convert_to_non_torch_type
130: return convert_to_non_torch_type(stats_dict)
146: return convert_to_non_torch_type(stats_dict)
Here are the lines preceding the line 130 reference:
@override(TorchPolicy)
def extra_action_out(self, input_dict, state_batches, model,
action_dist=None):
with torch.no_grad():
if extra_action_out_fn:
stats_dict = extra_action_out_fn(
self, input_dict, state_batches, model, action_dist
)
else:
stats_dict = TorchPolicy.extra_action_out(
self, input_dict, state_batches, model, action_dist
)
return convert_to_non_torch_type(stats_dict) # THIS IS LINE 130
and the lines preceding the line 146 reference:
@override(TorchPolicy)
def extra_grad_info(self, train_batch):
with torch.no_grad():
if stats_fn:
stats_dict = stats_fn(self, train_batch)
else:
stats_dict = TorchPolicy.extra_grad_info(self, train_batch)
return convert_to_non_torch_type(stats_dict) # THIS IS LINE 146
Now it works for me (meanwhile the PR has been merged into master). Thanks a lot, Sven!