I've been trying to run a simple shell command and had a difficult time figuring out how to print/capture the output of the command. I figured it out but thought it was worth pointing it out for others.
task = ShellTask()
with Flow("shell") as f:
result = task(command="ls")
print(result)
out = f.run()
Note it returns a Task:ShellTask object which is not what most folks are interested in.
<Task: ShellTask>
[2019-05-09 02:33:22,612] INFO - prefect.FlowRunner | Beginning Flow run for 'shell'
[2019-05-09 02:33:22,613] INFO - prefect.FlowRunner | Starting flow run.
[2019-05-09 02:33:22,616] INFO - prefect.TaskRunner | Task ''ls'': Starting task run...
[2019-05-09 02:33:22,616] INFO - prefect.TaskRunner | Task ''ls'': finished task run for task with final state: 'Success'
[2019-05-09 02:33:22,617] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2019-05-09 02:33:22,632] INFO - prefect.TaskRunner | Task 'ShellTask': finished task run for task with final state: 'Success'
[2019-05-09 02:33:22,633] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Had to call the run() method on the task to capture the stdout - it was a minor change that caused me a fair amount of confusion since the ShellTask examples did not use the run() method to execute the task.
task = ShellTask()
with Flow("shell") as f:
result = task.run(command="ls")
print(result)
out = f.run()
Hi @henry74 ! So there is a lot to unpack here. I recommend you check out some of our concept docs, and maybe in particular our "First Steps" document here: https://docs.prefect.io/guide/getting_started/first-steps.html#flows. (Note the section on "Deferred Execution").
If you want to use the output of your ShellTask inside of a workflow, you need to either:
If you simply call the task's run method yourself, you aren't actually building a Prefect Flow or using any of Prefect's features, and your command will run _immediately_. The purpose of Prefect is to _encapsulate_ workflow logic in a deployable, reproducible, debuggable way. So, for example, if you wanted to your run flow on a schedule, you need to _defer_ the execution of your command until the scheduled run time.
To do what I _think_ you want, you should instead do one of the following:
from prefect import task, Flow
from prefect.tasks.shell import ShellTask
my_task = ShellTask()
@task
def print_output(output):
print(output.decode())
with Flow("shell") as f:
result = my_task(command="ls")
print_output(result)
Note that _nothing has been executed yet_. This allows us to save the Flow, deploy it somewhere, etc. before executing it. If we do want to run the flow right now, we can now run:
f.run()
and you will see the output printed to your screen, because the print_output task runs after the ShellTask.
If, however, you simply want to inspect the output _after the flow has run_, you can find the output stored inside of the corresponding _state_ object. So in your example above, we could do:
from prefect import Flow
from prefect.tasks.shell import ShellTask
my_task = ShellTask()
with Flow("shell") as f:
output = my_task(command="ls")
flow_state = f.run() # flow_state is a `State` object
shell_output = flow_state.result[output].result.decode()
print(shell_output)
And again, we will see the output printed to the screen.
I hope that clarifies why you were seeing the behavior you were seeing. I'll leave this issue open for a bit for others to chime in.
Thanks for the concise explanation - was very helpful to see an example on pulling results from the state object.
Prior to seeing this, I did create a function which used task.run but it was wrapped with the task decorator so everything was working fine. Based on your explanation above it looks like the approach was legit although I felt a bit strange using a task object directly in a function with the task decorator. Seemed a little bit dirty. :-) (I also had the decode() bit since I realized I was getting a bytearray with all the escaped newlines)
python isn't a language I've used extensively so some of it a syntax learning curve but definitely some prefect magic I'm now more aware of. Great library btw - Airflow was certainly a pain which you highlighted well in your most recent blog post.
One more note to others::
task = ShellTask() # this will mess up the use of the @task operator; don't do this
my_task = ShellTask() # much better idea; name it anything else besides `task`
Most helpful comment
Hi @henry74 ! So there is a lot to unpack here. I recommend you check out some of our concept docs, and maybe in particular our "First Steps" document here: https://docs.prefect.io/guide/getting_started/first-steps.html#flows. (Note the section on "Deferred Execution").
If you want to use the output of your
ShellTaskinside of a workflow, you need to either:If you simply call the task's run method yourself, you aren't actually building a Prefect Flow or using any of Prefect's features, and your command will run _immediately_. The purpose of Prefect is to _encapsulate_ workflow logic in a deployable, reproducible, debuggable way. So, for example, if you wanted to your run flow on a schedule, you need to _defer_ the execution of your command until the scheduled run time.
To do what I _think_ you want, you should instead do one of the following:
Create a Task to print the output
Note that _nothing has been executed yet_. This allows us to save the Flow, deploy it somewhere, etc. before executing it. If we do want to run the flow right now, we can now run:
and you will see the output printed to your screen, because the
print_outputtask runs after theShellTask.Inspect the Task's state _after the Flow has run_
If, however, you simply want to inspect the output _after the flow has run_, you can find the output stored inside of the corresponding _state_ object. So in your example above, we could do:
And again, we will see the output printed to the screen.
I hope that clarifies why you were seeing the behavior you were seeing. I'll leave this issue open for a bit for others to chime in.