Ray: Using asyncio.wait on a ray ObjectRef can result in InvalidStateError logs

Created on 25 Sep 2020  路  4Comments  路  Source: ray-project/ray

What is the problem?

Versions

$ python --version
Python 3.6.12 :: Anaconda, Inc.

```bash
$ python -c "import ray; print('VERSION:', ray.__version__, 'COMMIT:', ray.__commit__)"
VERSION: 0.8.7 COMMIT: 56810e136587432ed4758d882bf8894b930355e3


#### Context

We use this construct to wait for an actor's result. The actor could fail (e.g. by going OOM) so we also do some periodic health checks on it. `asyncio.wait` is used to check for both the result and the health check concurrently.

#### Problem

When using `asyncio.wait` on a ray ObjectRef which represents an aync actor method call, sometimes (with high probability) the `set_result` callback is ran twice, resulting in an `InvalidStateError`. Since this exception is thrown in the callback itself, the code does not crash but it does result in some spammy error logs.

#### Related

#8841 seems to be related, but I have created a new ticket because:

- I cannot reproduce the original error from that ticket anymore using the latest releases.

```python
import ray
import time
import asyncio

ray.init()
@ray.remote
def f():
    time.sleep(5)

async def test():
    oid = f.remote()
    try:
        await asyncio.wait_for(oid, timeout=1)
    except asyncio.TimeoutError:
        pass
    try:
        await asyncio.wait_for(oid, timeout=1)
    except asyncio.TimeoutError:
        pass

asyncio.get_event_loop().run_until_complete(test())
  • The changes the ticket introduced are not in master anymore (https://github.com/ray-project/ray/pull/8842/files)

Reproduction

import ray
import asyncio
import logging
from typing import NoReturn
from ray.actor import ActorHandle
from contextlib import suppress

logging.basicConfig(level=logging.INFO)
ray.init()

# =============================================================================
# test setup
# =============================================================================

@ray.remote
class AsyncActor:

    async def ping(self) -> str:
        """Health check method.

        Used to determine if the actor is still alive.
        """
        return "OK"

    async def run(self) -> str:
        """Long running job.

        Can fail and can even crash the ray worker process.
        """
        await asyncio.sleep(5)
        return "DONE"


async def periodic_ping(actor_handle: ActorHandle) -> NoReturn:
    while True:
        asyncio.sleep(0.5)
        try:
            await actor_handle.ping.remote()
        except asyncio.CancelledError:
            pass
        except Exception as e:
            logging.exception("Actor is dead!")
            raise

async def run(actor_handle):

    loop = asyncio.get_event_loop()

    try:
        health_check = loop.create_task(periodic_ping(actor_handle))
        get_result = actor_handle.run.remote()
        done, pending = await asyncio.wait(
            [get_result, health_check],
            return_when=asyncio.FIRST_COMPLETED,
        )

        # The health check can only raise, so first task in done state
        # is the response from the ResultsActor.
        result = next(iter(done)).result()

    finally:
        with suppress(asyncio.CancelledError):
            health_check.cancel()

    return result

# =============================================================================
# run the test
# =============================================================================

async def test():

    actor_handle = AsyncActor.remote()

    # This actually completes fine, but somehow the `set_future` callback
    # can run twice, resulting in an ERROR log statement. It doesn't happen
    # everytime, but should be easily reproducible by running the script a few
    # times.
    #
    #    ERROR:asyncio:Exception in callback async_set_result.<locals>.set_future()
    #    handle: <Handle async_set_result.<locals>.set_future()>
    #    Traceback (most recent call last):
    #    File ".../lib/python3.6/asyncio/events.py", line 145, in _run
    #       self._callback(*self._args)
    #    File "python/ray/_raylet.pyx", line 1361, in ray._raylet.async_set_result.set_future
    #    asyncio.base_futures.InvalidStateError: invalid state

    result = await run(actor_handle)

    print(result)

if __name__ == "__main__":

    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())

If we cannot run your script, we cannot fix your issue.

  • [:heavy_check_mark:] I have verified my script runs in a clean environment and reproduces the issue.
  • [:heavy_check_mark:] I have verified the issue also occurs with the latest wheels.

Some clarification:
The issue does not manifest itself every time the script is run. It can take a few times.
Only tested with the latest _stable_ wheels.

P2 bug core

Most helpful comment

cc @ijrsvt @simon-mo Isn't it supposed to be fixed right? Was it pushed after 0.8.7?

@robbertvc Is it possible to try the latest commit and see if it still occurs?

All 4 comments

cc @ijrsvt @simon-mo Isn't it supposed to be fixed right? Was it pushed after 0.8.7?

@robbertvc Is it possible to try the latest commit and see if it still occurs?

@rkooo567 Hi Sang, unfortunately we can still reproduce it with the official ray 1.0.0 release. Subjectively, it seems to take a bit longer to reproduce though, this time it took around 20 runs of the script.

@simon-mo Can we close after @robbertvc tries this one more time? @robbertvc Would you mind trying the latest master and see if it is reproduced?

Sure. This is automatically closed by the PR merge.

Was this page helpful?
0 / 5 - 0 ratings