Hi everyone, I have been testing with concurrency tags and had a question about some behavior I am seeing with docker. It seems that after a task has been waiting (via concurrency tag limit) for 10 minutes, the prefect docker agent starts continuously spinning up new containers for that flow that is waiting to start. If I do a “docker ps” I could see 20 or more containers running for that flow.
While a flow is paused due to a concurrency tag limit, new docker containers are not continuously spun up by the agent.
It should be fairly simple to reproduce.
Prefect Version: 0.10.6
Docker Engine Version: 19.03.6-ce
Storage Type: Docker
OS: Replicated the issue on both Mac and Amazon Linux 2
Optionally run prefect diagnostics from the command line and paste the information here. -->
Tagging @zdhughes for any ideas on this. All the agent is doing is grabbing flow runs from the getRunsInQueue mutation so something must be enqueueing them.
@cicdw thank you for this fix! It has made things significantly better. The agent is spinning up new containers less frequently on a waiting task - once every 10 minutes instead of once every minute. But, we've been getting intermittent "Internal Server errors" and I believe it may have something to do with too many containers being launched. We might have 5 or so tasks waiting on another task that has been running for an hour. So, we end up getting about 30 containers running on the daemon. I tested this with 2 flows, just letting the one sit idle on a concurrency tag for a few hours. Eventually, we got the error. Nothing else was running. Any thoughts on this?
Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 4c3b5d67-83b0-420c-8a0d-c70573c78b47: provided a running state but associated flow run 343f15ae-bb58-4e9c-9270-f90b70cecf8f is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 115, in call_runner_target_handlers
cache_for=self.task.cache_for,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1185, in set_task_run_state
version=version,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 226, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 4c3b5d67-83b0-420c-8a0d-c70573c78b47: provided a running state but associated flow run 343f15ae-bb58-4e9c-9270-f90b70cecf8f is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Hi @cvrooman - Hmmm I'm looking into this; are you still running Prefect Core version 0.10.6?
Thank you! Nope, I've since upgraded Prefect on the server with the docker agent to 0.12.2. The flows have also all been rebuilt/registered.
Hi @cvrooman - thanks for raising this; very interesting situation! Question for you: did you manually update any states on your Flow or your Tasks during this long running test?
@cicdw sure thing, you're referring to updating with "mark as"?
Not before the first flow failed with the internal_server_error. I cancelled the remaining test flow afterwards. Otherwise I didn't touch the flows after they were launched.
Yup exactly - thanks! That actually helps a lot. I haven't been able to recreate quite yet but I'm on the trail. For some additional info, it appears that _somewhere_ in Prefect, the Queued states are compounding and eventually there are so many Queued states on Queued states that a recursion error is hit.
In code:
from prefect.engine.state import Queued
from prefect.serialization.state import StateSchema
schema = StateSchema()
state = Queued(state=Queued())
for idx in range(500):
print(idx)
state = Queued(state=state)
schema.load(state.serialize())
The catch is that I can't isolate where we might accidentally do this, but I'll report back when I've found it!
Ah okay I see. This might not be super helpful but, just to see the issue with docker on my local machine I used this flow and registered it two times with a different name.
from prefect import task, Flow
from prefect.environments.storage import Docker
from time import sleep
@task(tags=['test'])
def my_func1():
sleep(12600)
@task(tags=['test'])
def my_func2():
sleep(12600)
@task(tags=['test'])
def my_func3():
sleep(12600)
@task(tags=['test'])
def my_func4():
sleep(12600)
with Flow('testing1') as flow:
my_func1()
my_func2()
my_func3()
my_func4()
flow.storage = Docker(
base_image="python:3.7",
image_tag="latest",
)
if __name__ == '__main__':
flow.register(project_name="test-proj", labels=['test-flows'])

Hi @cvrooman thanks for your patience - I've identified the issue, and the fix was ultimately a simple one: https://github.com/PrefectHQ/prefect/pull/2965
Once this change is updated in the backend you don't necessarily need to update your Flow version (although you can).
We'll try to have this released ASAP, hopefully later today 🙏
@cicdw Ah, this is great thanks for the quick turn around on this!! Would love to test the update as soon as the release is available. This fix should really help us.
No problem! This has been released on the backend so you should be good to go @cvrooman - let us know if you encounter any issues!
@cicdw just following up on my testing. I've rebuilt these test flows and restarted the prefect docker agent. It still seems that new containers are still spinning up every 10 minutes. But, would this change also need applied to a specific instance we are assigned to on the backend? Or, maybe I am missing a step on my side?
Ah @cvrooman my apologies, I was focused on the recursion error that you saw (which should now be fixed)!
Yes, at this exact moment both Queued and Retrying tasks result in job submissions every 10 minutes and that's an issue on the Cloud side, not you. The ultimate reason for this is that after 10 minutes we (incorrectly) assume that no processes are currently waiting to run the task. We have scoped out of a fix for this in the backend to detect processes which are still heartbeating, waiting on the Queued / Retrying task, and so we can prevent the release of that work to the agent.
I will get that work reprioritized and will report back when it's released!
Update for you @cvrooman : we have identified the root cause of the redundant containers being created every 10 minutes, and we have a fix in our staging environment. It will require an upgrade to 0.12.5 when that is released early next week - I'll keep you in the loop!
Final update @cvrooman - the required changes on the backend have been released! If you run your work off of master (or 0.12.5 after it's released tomorrow) you shouldn't experience the repeated redundant submissions of work
@cicdw sorry for the delay in my response - I appreciate your help and updates on this. This is great news, thanks for tracking this down! We ended coming up with an alternative flow that ran more linear (without concurrency tags) just for the time being. I'll keep an eye out for 0.12.5 today and give it a shot!