Environment variables that collide with certain prescribed Prefect variables in a custom worker or scheduler pod spec for Dask-K8s don't have an effect.
Consider a worker pod spec like the one provided in the library:
...
env:
...
- name: PREFECT__LOGGING__LEVEL
value: "DEBUG"
...
If you create a custom worker spec like this, the log level will get overridden and you will not be able to get debug logs.
If you set environment variables in a pod spec, it will not get overridden.
Pass the file prefect/src/prefect/environments/execution/dask/worker_pod.yaml as the argument worker_spec_file to DaskKubernetesEnvironment and the eventual Prefect worker will not log at the debug level.
I think the reason is actually pretty simple and while I don't know this exactly, it seems right.
There are four methods in DaskKubernetesEnvironment that populate yaml specs. The first two are provided for reference and if someone wants to compare how those work to the custom spec methods.
_populate_job_yaml: Populate the provided job.yaml_populate_worker_pod_yaml: Populate the provided worker_pod.yaml_populate_scheduler_spec_yaml: Populate a custom scheduler spec file provided by the user._populate_worker_spec_yaml: Populate a custom worker spec file provided by the user.If you provide custom files, these parts here for the respective files will make sure the environment variables in the specs have all the ones needed for Prefect, just in case you forgot to provide them.
https://github.com/PrefectHQ/prefect/blob/0.13.4/src/prefect/environments/execution/dask/k8s.py#L460-L505
https://github.com/PrefectHQ/prefect/blob/0.13.4/src/prefect/environments/execution/dask/k8s.py#L526-L569
However, the logic is that env.extend(env_values) will extend the list of environment variables, even if there are duplicates. Sticking with the above example with trying to set PREFECT__LOGGING__LEVEL to "DEBUG", when looking at the output of kubectl describe on a pod, we'll see the variable listed twice. The first time is our attempted specification and the second time is the one added by the Prefect code. K8s only seems to consider the latter value.
Furthermore, since the log level populated by Prefect comes from the config and _populate_scheduler_spec_yaml and _populate_worker_spec_yaml are called in execute and run, respectively, you can't set the config locally to affect populating the file. They get populated at the moment they are needed. This means there's a bit of a cascade effect: we had go all the way to the agent and change PREFECT__LOGGING__LEVEL in order to get the log level to propagate down to the worker.
Adjust the logic so that all Prefect default values are at the beginning of the list (if this is indeed the logic K8s uses). Otherwise, only add the Prefect values if they are not specified in the spec file.
@alexifm Good call out! Yeah those places in population functions where the env vars are extended we could do a simple check to only add them if they are not already provided.
As mentioned on Slack, I have the exact same issue!
dask-kubernetes cluster running on AWS EKS with the following minimal flow example:
import os
from datetime import datetime
​
import prefect
from prefect import Flow, task
from prefect.environments import DaskKubernetesEnvironment
from prefect.environments.storage import Docker
​
​
@task
def debug_logging_task():
logger = prefect.context.get("logger")
​
logger.debug("a debug message")
logger.info(f"PREFECT__LOGGING__LEVEL: {os.environ.get('PREFECT__LOGGING__LEVEL')}")
​
​
with Flow("minimal_flow") as flow:
​
debug_logging_task()
​
​
flow.environment = DaskKubernetesEnvironment(
min_workers=1, max_workers=10, labels=["k8s"]
)
​
flow.storage = Docker(
python_dependencies=[
# "numpy",
# "pandas",
# "snowflake-connector-python[pandas]==2.3.2",
# "snowflake-sqlalchemy>=1.2.4",
"tqdm",
],
registry_url="495775544086.dkr.ecr.eu-central-1.amazonaws.com",
image_name="minimal_flow",
image_tag="beta_" + datetime.now().strftime("%Y%m%d_%H%M%S"),
env_vars={"PREFECT__LOGGING__LEVEL": "DEBUG"},
)
flow.register(project_name="eks_test_01")
PS: We additionally added the environment variable to the whole prefect agent when spinning it up using pulumi, but the error remains.

Just read your thread on the slack. Yea, you're coming across the same problem we faced. The solution should be pretty straightforward. Sorry haven't had the bandwidth to submit a PR on this.
It boils down to two locations (lines following 505 and 569) in src\prefect\environments\execution\dask\k8s.py with the following code, right?
# set environment variables
env = yaml_obj["spec"]["template"]["spec"]["containers"][0].get("env")
if not env:
yaml_obj["spec"]["template"]["spec"]["containers"][0]["env"] = []
env = yaml_obj["spec"]["template"]["spec"]["containers"][0]["env"]
env.extend(env_values)
# set image
yaml_obj["spec"]["template"]["spec"]["containers"][0]["image"] = docker_name
return yaml_obj
with env_values defined as:
env_values = [
...,
{
"name": "PREFECT__LOGGING__LEVEL",
"value": str(prefect.config.logging.level),
},
...,
]
I think your approach that sets the environment variables on the Docker image might need some other fix. The execution environment is setting the variables in the pod specification and has no awareness of what's set on the image. For us, if we fix how the pod spec is being handled, we'll be fine. But I think you'll still run into the problem.
ETA: You could change how you do things and set the variable on the execution side instead of the storage side. It would perhaps help people if the storage raised a warning that setting one of those special prefect environment variables might get superseded.
@joshmeek, does it make sense to provide the environment variables as a dictionary and write a small dict_to_yaml function that transforms it into a yaml file with some check whether the environment variables are already defined?
I think your approach that sets the environment variables on the Docker image might need some other fix. The execution environment is setting the variables in the pod specification and has no awareness of what's set on the image. For us, if we fix how the pod spec is being handled, we'll be fine. But I think you'll still run into the problem.
Oh, good point!
In my opinion, if it raises a warning that describes how to set it in the execution environment, that would be a sufficient solution.
I already added the PREFECT__LOGGING__LEVEL environment variable to the prefect agent k8s deployment (using pulumi for infrastructure as code), is that identical with the execution environment?
ETA: How do you set the variable on the execution side?
We're both interested in affecting the environment in which the workers are running. One approach is with an execution specification, ie the yaml spec which is what our group did, the other is via Prefect's approach to storage, ie the Docker image as you did. Right now, as far as I can tell the only way to get an environment variable like PREFECT__LOGGING__LEVEL set is to set it on the agent. Setting it any other way doesn't appear to work.
How do you set the variable on the execution side?
This is the default worker spec that we tried to override (it mostly works, just not the environment variables)
https://github.com/PrefectHQ/prefect/blob/0.13.4/src/prefect/environments/execution/dask/worker_pod.yaml
Does that clarify what I was saying?
Thanks for the clarifications, I think I understood.
Possibilities to set PREFECT__LOGGING__LEVEL:
k8s prefect agent deploy (infrastructure specification)flow.storage = Docker(..., env_vars={}) (storage specification)worker_pod.yaml (execution specification)Hope I got it right? :D
ETA:
Normally, you would not want to do it on the infrastructure specification, because you want to change it without having to change infrastructure, right? However, I tried it out of desperation... x)