Ray: [rllib] whether support reset_args for the function reset of environments?

Created on 27 Sep 2018  ·  11Comments  ·  Source: ray-project/ray

During training, at each iteration, we may expect to sample a specific batch of tasks and reset the environment to a task in some batch, rather than a random task sampled from the task distribution.

Can we implement this using the current version? Or whether we can add an extra argument in the function of reset like https://github.com/cbfinn/maml_rl/blob/master/rllab/envs/normalized_env.py#L51.

enhancement question rllib stale

All 11 comments

There isn't a way to do this right now, though you could hack something together with global variables.

One way to do this is with a Ray named actor:

from ray.experimental import named_actors

@ray.remote
class ResetArgsHolder:
   def __init__(self, args):
      self.args = args
   def get(self):
      return self.args
   def set(self, args):
      self.args = args

# on the driver
args_actor = ResetArgsHolder.remote(initial_args)
named_actors.register_actor("reset_args", args_actor)
# to set new reset args
args_actor.set.remote(new_args)

# in your env
current_reset_args = ray.get(named_actors.get_actor("reset_args").get.remote())

Thanks for your solution. It helps me a lot!

I found another problem.

In the function ray.rllib.evaluation.sampler._env_runner, we only invoke env.reset after a trajectory is done (in complete_episodes mode).

Thus, after the last optimizer.step finished, even if we modify the reset_args of environments of a specific remote evaluator via ResetArgsHolder.set, the first returned trajectories may be still sampled with old reset_args.

Do I understand the _env_runner correctly?

Hm, I don't think reset is called until the next round in synchronous
sampling mode, since the reset call is after the yield of the batch in
sampler. Is this not the case?

On Mon, Oct 29, 2018, 7:08 AM lanlin notifications@github.com wrote:

I found another problem.

In the function ray.rllib.evaluation.sampler._env_runner, we only invoke
env.reset after a trajectory is done (in complete_episodes mode).

Thus, after the last optimizer.step finished, even if we modify the
reset_args of environments of a specific remote evaluator via
ResetArgsHolder.set, the first returned trajectories may be still sampled
with old reset_args.

Do I understand the _env_runner correctly?


You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/ray-project/ray/issues/2969#issuecomment-433923240,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAA6Sg1IjaGyJb5h0zSvyQLiCtq3YP9oks5upwvqgaJpZM4W7vUe
.

I mean the case of num_envs_per_worker > 1.

Let's say for an evaluator with horizon = 10, batch_steps = 10, num_envs_per_worker = 3. In complete_episodes mode, each call of evaluator.sample generates at least 30 steps.

For example, at the first iteration with reset_args = A, the process of sampling is as follows:
[env_1_7, env_2_8, env_3_9, env_1_10], where env_x_y means a batch with y steps sampled from environment x. Then, we use these 8 + 10 + 8 + 10 = 36 steps to optimize the policy. At the same time, the status of the three environments are

  • env_1 just resets to A;
  • env_2 has sampled 9 steps and the observation is obs_9;
  • env_3 has sampled 8 steps and the observation is obs_8.

At the second iteration, we first set ResetArgsHolder to B. However, when we call evaluator.sample, the sampling of three environments are

  • env_1 continues sampling with reset_arg = A;
  • env_2 continues sampling from obs_9 but with the updated policy;
  • env_3 continues sampling from obs_8 but with the updated policy.

Therefore, at the second iteration, the reset_args = B only takes effect after each environment generates a trajectory. On the other hand, the samplings of env_2 and env_3 are slightly off-policy.

Do I explain clearly and am I right?

I see. I agree that in complete_episodes mode, sampling can be a bit off
policy in the vector case even in synchronous mode. This is a known
problem:
https://github.com/ray-project/ray/blob/6531eed2d0b17e26bab2ea60fb9de6e659610098/python/ray/rllib/evaluation/policy_evaluator.py#L131

In your case, I think even truncate episodes mode will have the same issue,
since we do the resets after yield is called for each env.

One workaround may be to "flush" these sample batches after changing reset
args. You can do this by running agent.optimizer.foreach_evaluator(lambda ev
: ev.sample()).

On Mon, Oct 29, 2018 at 10:18 AM lanlin notifications@github.com wrote:

I mean the case of num_envs_per_worker > 1.

Let's say for an evaluator with horizon = 10, batch_steps = 10,
num_envs_per_worker = 3. In complete_episodes mode, each call of
evaluator.sample generates at least 30 steps.

For example, at the first iteration with reset_args = A, the process of
sampling is as follows:
[env_1_7, env_2_8, env_3_9, env_1_10], where env_x_y means a batch with y
steps sampled from environment x. Then, we use these 8 + 10 + 8 + 10 = 36
steps to optimize the policy. At the same time, the status of the three
environments are

  • env_1 just resets to A;
  • env_2 has sampled 9 steps and the observation is obs_9;
  • env_3 has sampled 8 steps and the observation is obs_8.

At the second iteration, we first set ResetArgsHolder to B. However, when
we call evaluator.sample, the sampling of three environments are

  • env_1 continues sampling with reset_arg = A;
  • env_2 continues sampling from obs_9 but with the updated policy;
  • env_3 continues sampling from obs_8 but with the updated policy.

Therefore, at the second iteration, the reset_args = B only takes effect
after each environment generates a trajectory. On the other hand, the
samplings of env_2 and env_3 are slightly off-policy.

Do I explain clearly and am I right?


You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/ray-project/ray/issues/2969#issuecomment-433996872,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAA6SkaM7OxlVLFVGmDMur9Z2o2IVKhSks5upzhrgaJpZM4W7vUe
.

I expect the sampling to meet the two conditions: with right reset_args and without off-policy data. Originally, I want to use ev.sample() to filter out the first batch of each environment to meet the two conditions, but there is no information about env_id in a batch.

Just filtering out batches according to reset_args will lead to off-policy data.

So is it possible to reset the sampler? Or, if I want to reset it manually, what should I pay attention to?

  • set a new AsynEnv or call async_vector_env.try_reset for each environment;
  • set a new self.rollout_provider=_env_runner for the sampler.

Is there anything else?

Another reason why I want to reset the sampler is that when I am doing some tests, if I interrupt the code, the call ev.sample() always raises StopIteration exception.

I use the following code to reset the sampling:

class NewPolicyEvalutor(PolicyEvaluator):
    def reset_sample(self):
        async_env = self.async_env
        sampler = self.sampler
        batch_mode = self.batch_mode
        if not isinstance(async_env, _VectorEnvToAsync) \
                or not isinstance(sampler, SyncSampler) \
                or batch_mode != "complete_episodes":
            raise NotImplementedError

        # reset async_env
        async_env.new_obs = async_env.vector_env.vector_reset()
        async_env.cur_rewards = [None for _ in range(async_env.num_envs)]
        async_env.cur_dones = [False for _ in range(async_env.num_envs)]
        async_env.cur_infos = [None for _ in range(async_env.num_envs)]

        # reset sampler
        sampler.async_vector_env = async_env
        sampler.rollout_provider = _env_runner(
            sampler.async_vector_env, sampler.extra_batches.put,
            sampler.policies, sampler.policy_mapping_fn,
            sampler.unroll_length, sampler.horizon,
            sampler._obs_filters, False, False, self.tf_sess)
        sampler.get_metrics()
        sampler.get_extra_batches()

    def sample(self):
        self.reset_sample()
        return PolicyEvaluator.sample(self)

After enabling the reset of sampling, my code converges. On the other hand, the above changes make the training of A2C more stable. I'll post some results later.

Unfortunately, I find that the following solution does not work. The named actor cannot be called in different places, similar to #2115 and comment.

There isn't a way to do this right now, though you could hack something together with global variables.

One way to do this is with a Ray named actor:

from ray.experimental import named_actors

@ray.remote
class ResetArgsHolder:
   def __init__(self, args):
      self.args = args
   def get(self):
      return self.args
   def set(self, args):
      self.args = args

# on the driver
args_actor = ResetArgsHolder.remote(initial_args)
named_actors.register_actor("reset_args", args_actor)
# to set new reset args
args_actor.set.remote(new_args)

# in your env
current_reset_args = ray.get(named_actors.get_actor("reset_args").get.remote())

Maybe store the named actor handle in a global variable or something to
avoid getting it multiple times?

On Tue, Oct 30, 2018, 6:04 AM lanlin notifications@github.com wrote:

I use the following code to reset the sampling:

class NewPolicyEvalutor(PolicyEvaluator):
def reset_sample(self):
async_env = self.async_env
sampler = self.sampler
batch_mode = self.batch_mode
if not isinstance(async_env, _VectorEnvToAsync) \
or not isinstance(sampler, SyncSampler) \
or batch_mode != "complete_episodes":
raise NotImplementedError

    # reset async_env
    async_env.new_obs = async_env.vector_env.vector_reset()
    async_env.cur_rewards = [None for _ in range(async_env.num_envs)]
    async_env.cur_dones = [False for _ in range(async_env.num_envs)]
    async_env.cur_infos = [None for _ in range(async_env.num_envs)]

    # reset sampler
    sampler.async_vector_env = async_env
    sampler.rollout_provider = _env_runner(
        sampler.async_vector_env, sampler.extra_batches.put,
        sampler.policies, sampler.policy_mapping_fn,
        sampler.unroll_length, sampler.horizon,
        sampler._obs_filters, False, False, self.tf_sess)
    sampler.get_metrics()
    sampler.get_extra_batches()

def sample(self):
    self.reset_sample()
    return PolicyEvaluator.sample(self)

After enabling the reset of sampling, my code converges. On the other
hand, the above changes make the training of A2C more stable. I'll post
some results later.

Unfortunately, I find that the following solution does not work. The named
actor cannot be called in different places, similar to #2115
https://github.com/ray-project/ray/issues/2115 and comment
https://github.com/ray-project/ray/pull/2129#issuecomment-391619687.

There isn't a way to do this right now, though you could hack something
together with global variables.

One way to do this is with a Ray named actor:

from ray.experimental import named_actors

@ray.remote
class ResetArgsHolder:
def __init__(self, args):
self.args = args
def get(self):
return self.args
def set(self, args):
self.args = args

on the driver

args_actor = ResetArgsHolder.remote(initial_args)
named_actors.register_actor("reset_args", args_actor)

to set new reset args

args_actor.set.remote(new_args)

in your env

current_reset_args = ray.get(named_actors.get_actor("reset_args").get.remote())


You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/ray-project/ray/issues/2969#issuecomment-434290272,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAA6SmTuPZqsxmRjF0Z5RH5nmbCsaLSzks5uqE5kgaJpZM4W7vUe
.

Now, I just put the actor in agent.config["env_config"], something like ParameterServerActor.

I also tried to put the pickled string of the actor in agent.config["env_config"] to avoid the error in #3166, but it has the same issue with #2115.

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

  • If you'd like to keep the issue open, just leave any comment, and the stale label will be removed!
  • If you'd like to get more attention to the issue, please tag one of Ray's contributors.

You can always ask for help on our discussion forum or Ray's public slack channel.

Was this page helpful?
0 / 5 - 0 ratings