A thread-local WorkerThreadContext is maintained for each core worker thread, and is used to (1) get the currently executing task, (2) maintain a put index counter tracking the number of object puts in the current task and for deterministically generating put object IDs, and (3) maintain a task index tracking the number of tasks submitted from the current task and for generating task IDs and setting parent counters on task specs. This works fine for the thread-based worker model, where only one task is executing on a thread at a time and said tasks execute to completion; namely, CoreWorker::ExecuteTask properly sets the state and resets the state at the beginning and end of task execution, respectively, and the underlying Python task is executed on the same thread, so the current task is correctly set and the put index state is correctly maintained for this core worker thread.
However, when executing tasks on async Python actors, the Python task is run on an event loop which is on a different thread than the core worker fiber runner thread; the latter is where CoreWorker::ExecuteTask will run and (re)set the current task, while the former is where any subtask submissions and the CoreWorker::Put and CoreWorker::Create calls for put objects will happen within an executing async task, causing the task and put indices to be incremented. This thread split results in:
Ray version and other system information (Python version, TensorFlow version, OS):
Ray version: Current master
@ray.remote(num_cpus=0)
class SignalActor:
def __init__(self):
self.ready_event = asyncio.Event()
def send(self, clear=False):
self.ready_event.set()
if clear:
self.ready_event.clear()
async def wait(self, should_wait=True):
ray.put(1)
ray.get(_put.remote(2))
if should_wait:
await self.ready_event.wait()
@ray.remote
def _put(obj):
return obj
signal = SignalActor.remote()
result_id = signal.wait.remote(should_wait=False)
result_id = signal.wait.remote()
View debug logs showing the put object ID generated by each signal.wait.remote() call. The underlying task ID will be the same (randomly generated) ID, despite the put objects being from two different tasks, and the put index will continue incrementing across the two wait tasks.
If you add << ", parent_task_id=" << ParentTaskId() << ", parent_counter=" << ParentCounter(); to the stream in TaskSpecification::DebugString(), you'll also see that the parent task ID for each of the tasks submitted by the separate wait tasks will have the same parent task ID (the randomly generated one), and the parent counter will be at 2 for the second _put task instead of at 1, as it should be.
I think I know exactly why it happens, but I'd like to understand the implication of wrong objectIDs in this case. Is there more implication than that it is hard to debug? (like functionality issues or etc.)?
@rkooo567 This issue was discovered while working on a solution for this issue in this PR, which involves using a task's num_returns as a base for the put index in order to guarantee no object ID conflicts between put objects and return objects. That obviously fails for async Python tasks since the current task spec is not set in the event loop thread. I'll be pushing up a workaround as a stopgap for that PR, if I can find one.
AFAICT, there aren't any hard functionality issues within master, but this definitely breaks the hierarchical object ID model (which _is_ user-facing IMO) and the semantics behind put and task indices: async task put object IDs don't have the right task ID set, and task spec fields (parent task ID and parent task counter) are being incorrectly set for subtasks of async tasks. The WorkerThreadContext abstraction appears to offer semantics/invariants that don't hold for async Python tasks, which will be an issue whenever anyone tries to extend it or use it in any new meaningful way (as I'm trying to do); it's super, super fragile right now.
For a longer-term solution, we'd probably want the put and task indices somehow maintained in Python coroutine memory (contextvar somehow set?), and do the equivalent in C++ land with fiber local storage. I'm still thinking about the best way to do that without creating a large refactor.
Thanks for the information in details.
This behavior should be fixed as I'd like to support context (which users can print their task id in the method) in the sooner future. I will set P2 because it won't be hard requirement for 1.0 because using objectID is not something we officially support.
Can you try this
I'll be pushing up a workaround as a stopgap for that PR, if I can find one.
and let me know if you can find any workaround? If it is a hard blocker, I will try to spend some time at night to have a fix.
@edoakes Is @ijrsvt planning to fix this issue?
There are no immediate plans to fix it.
@edoakes and I chatted a little bit about this offline. We think one possible approach would be:
task_id->worker_context.These are just rough thoughts and helpfully it can guide to a more fully flushed design.
That's actually pretty similar to the idea @clarkzinzow and I discussed before.
Also, I think it is pretty crucial for serve use case. It can probably break custom metrics if this issue is not fixed.