$ python --version
Python 3.6.12 :: Anaconda, Inc.
```bash
$ python -c "import ray; print('VERSION:', ray.__version__, 'COMMIT:', ray.__commit__)"
VERSION: 0.8.7 COMMIT: 56810e136587432ed4758d882bf8894b930355e3
#### Context
We use this construct to wait for an actor's result. The actor could fail (e.g. by going OOM) so we also do some periodic health checks on it. `asyncio.wait` is used to check for both the result and the health check concurrently.
#### Problem
When using `asyncio.wait` on a ray ObjectRef which represents an aync actor method call, sometimes (with high probability) the `set_result` callback is ran twice, resulting in an `InvalidStateError`. Since this exception is thrown in the callback itself, the code does not crash but it does result in some spammy error logs.
#### Related
#8841 seems to be related, but I have created a new ticket because:
- I cannot reproduce the original error from that ticket anymore using the latest releases.
```python
import ray
import time
import asyncio
ray.init()
@ray.remote
def f():
time.sleep(5)
async def test():
oid = f.remote()
try:
await asyncio.wait_for(oid, timeout=1)
except asyncio.TimeoutError:
pass
try:
await asyncio.wait_for(oid, timeout=1)
except asyncio.TimeoutError:
pass
asyncio.get_event_loop().run_until_complete(test())
import ray
import asyncio
import logging
from typing import NoReturn
from ray.actor import ActorHandle
from contextlib import suppress
logging.basicConfig(level=logging.INFO)
ray.init()
# =============================================================================
# test setup
# =============================================================================
@ray.remote
class AsyncActor:
async def ping(self) -> str:
"""Health check method.
Used to determine if the actor is still alive.
"""
return "OK"
async def run(self) -> str:
"""Long running job.
Can fail and can even crash the ray worker process.
"""
await asyncio.sleep(5)
return "DONE"
async def periodic_ping(actor_handle: ActorHandle) -> NoReturn:
while True:
asyncio.sleep(0.5)
try:
await actor_handle.ping.remote()
except asyncio.CancelledError:
pass
except Exception as e:
logging.exception("Actor is dead!")
raise
async def run(actor_handle):
loop = asyncio.get_event_loop()
try:
health_check = loop.create_task(periodic_ping(actor_handle))
get_result = actor_handle.run.remote()
done, pending = await asyncio.wait(
[get_result, health_check],
return_when=asyncio.FIRST_COMPLETED,
)
# The health check can only raise, so first task in done state
# is the response from the ResultsActor.
result = next(iter(done)).result()
finally:
with suppress(asyncio.CancelledError):
health_check.cancel()
return result
# =============================================================================
# run the test
# =============================================================================
async def test():
actor_handle = AsyncActor.remote()
# This actually completes fine, but somehow the `set_future` callback
# can run twice, resulting in an ERROR log statement. It doesn't happen
# everytime, but should be easily reproducible by running the script a few
# times.
#
# ERROR:asyncio:Exception in callback async_set_result.<locals>.set_future()
# handle: <Handle async_set_result.<locals>.set_future()>
# Traceback (most recent call last):
# File ".../lib/python3.6/asyncio/events.py", line 145, in _run
# self._callback(*self._args)
# File "python/ray/_raylet.pyx", line 1361, in ray._raylet.async_set_result.set_future
# asyncio.base_futures.InvalidStateError: invalid state
result = await run(actor_handle)
print(result)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
If we cannot run your script, we cannot fix your issue.
Some clarification:
The issue does not manifest itself every time the script is run. It can take a few times.
Only tested with the latest _stable_ wheels.
cc @ijrsvt @simon-mo Isn't it supposed to be fixed right? Was it pushed after 0.8.7?
@robbertvc Is it possible to try the latest commit and see if it still occurs?
@rkooo567 Hi Sang, unfortunately we can still reproduce it with the official ray 1.0.0 release. Subjectively, it seems to take a bit longer to reproduce though, this time it took around 20 runs of the script.
@simon-mo Can we close after @robbertvc tries this one more time? @robbertvc Would you mind trying the latest master and see if it is reproduced?
Sure. This is automatically closed by the PR merge.
Most helpful comment
cc @ijrsvt @simon-mo Isn't it supposed to be fixed right? Was it pushed after 0.8.7?
@robbertvc Is it possible to try the latest commit and see if it still occurs?