Prefect: How to print/capture stdout for running a ShellTask

Created on 9 May 2019  路  2Comments  路  Source: PrefectHQ/prefect

I've been trying to run a simple shell command and had a difficult time figuring out how to print/capture the output of the command. I figured it out but thought it was worth pointing it out for others.

task = ShellTask()

with Flow("shell") as f:
    result = task(command="ls")
    print(result)

out = f.run()

Note it returns a Task:ShellTask object which is not what most folks are interested in.

<Task: ShellTask>
[2019-05-09 02:33:22,612] INFO - prefect.FlowRunner | Beginning Flow run for 'shell'
[2019-05-09 02:33:22,613] INFO - prefect.FlowRunner | Starting flow run.
[2019-05-09 02:33:22,616] INFO - prefect.TaskRunner | Task ''ls'': Starting task run...
[2019-05-09 02:33:22,616] INFO - prefect.TaskRunner | Task ''ls'': finished task run for task with final state: 'Success'
[2019-05-09 02:33:22,617] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2019-05-09 02:33:22,632] INFO - prefect.TaskRunner | Task 'ShellTask': finished task run for task with final state: 'Success'
[2019-05-09 02:33:22,633] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

Had to call the run() method on the task to capture the stdout - it was a minor change that caused me a fair amount of confusion since the ShellTask examples did not use the run() method to execute the task.

task = ShellTask()

with Flow("shell") as f:
    result = task.run(command="ls")
    print(result)

out = f.run()

Most helpful comment

Hi @henry74 ! So there is a lot to unpack here. I recommend you check out some of our concept docs, and maybe in particular our "First Steps" document here: https://docs.prefect.io/guide/getting_started/first-steps.html#flows. (Note the section on "Deferred Execution").

If you want to use the output of your ShellTask inside of a workflow, you need to either:

  • create a _task_ for it
  • extract it from the Task's State

If you simply call the task's run method yourself, you aren't actually building a Prefect Flow or using any of Prefect's features, and your command will run _immediately_. The purpose of Prefect is to _encapsulate_ workflow logic in a deployable, reproducible, debuggable way. So, for example, if you wanted to your run flow on a schedule, you need to _defer_ the execution of your command until the scheduled run time.

To do what I _think_ you want, you should instead do one of the following:

Create a Task to print the output

from prefect import task, Flow
from prefect.tasks.shell import ShellTask

my_task = ShellTask()

@task
def print_output(output):
    print(output.decode())

with Flow("shell") as f:
    result = my_task(command="ls")
    print_output(result)

Note that _nothing has been executed yet_. This allows us to save the Flow, deploy it somewhere, etc. before executing it. If we do want to run the flow right now, we can now run:

f.run()

and you will see the output printed to your screen, because the print_output task runs after the ShellTask.

Inspect the Task's state _after the Flow has run_

If, however, you simply want to inspect the output _after the flow has run_, you can find the output stored inside of the corresponding _state_ object. So in your example above, we could do:

from prefect import Flow
from prefect.tasks.shell import ShellTask

my_task = ShellTask()

with Flow("shell") as f:
    output = my_task(command="ls")

flow_state = f.run() # flow_state is a `State` object
shell_output = flow_state.result[output].result.decode()
print(shell_output)

And again, we will see the output printed to the screen.

I hope that clarifies why you were seeing the behavior you were seeing. I'll leave this issue open for a bit for others to chime in.

All 2 comments

Hi @henry74 ! So there is a lot to unpack here. I recommend you check out some of our concept docs, and maybe in particular our "First Steps" document here: https://docs.prefect.io/guide/getting_started/first-steps.html#flows. (Note the section on "Deferred Execution").

If you want to use the output of your ShellTask inside of a workflow, you need to either:

  • create a _task_ for it
  • extract it from the Task's State

If you simply call the task's run method yourself, you aren't actually building a Prefect Flow or using any of Prefect's features, and your command will run _immediately_. The purpose of Prefect is to _encapsulate_ workflow logic in a deployable, reproducible, debuggable way. So, for example, if you wanted to your run flow on a schedule, you need to _defer_ the execution of your command until the scheduled run time.

To do what I _think_ you want, you should instead do one of the following:

Create a Task to print the output

from prefect import task, Flow
from prefect.tasks.shell import ShellTask

my_task = ShellTask()

@task
def print_output(output):
    print(output.decode())

with Flow("shell") as f:
    result = my_task(command="ls")
    print_output(result)

Note that _nothing has been executed yet_. This allows us to save the Flow, deploy it somewhere, etc. before executing it. If we do want to run the flow right now, we can now run:

f.run()

and you will see the output printed to your screen, because the print_output task runs after the ShellTask.

Inspect the Task's state _after the Flow has run_

If, however, you simply want to inspect the output _after the flow has run_, you can find the output stored inside of the corresponding _state_ object. So in your example above, we could do:

from prefect import Flow
from prefect.tasks.shell import ShellTask

my_task = ShellTask()

with Flow("shell") as f:
    output = my_task(command="ls")

flow_state = f.run() # flow_state is a `State` object
shell_output = flow_state.result[output].result.decode()
print(shell_output)

And again, we will see the output printed to the screen.

I hope that clarifies why you were seeing the behavior you were seeing. I'll leave this issue open for a bit for others to chime in.

Thanks for the concise explanation - was very helpful to see an example on pulling results from the state object.

Prior to seeing this, I did create a function which used task.run but it was wrapped with the task decorator so everything was working fine. Based on your explanation above it looks like the approach was legit although I felt a bit strange using a task object directly in a function with the task decorator. Seemed a little bit dirty. :-) (I also had the decode() bit since I realized I was getting a bytearray with all the escaped newlines)

python isn't a language I've used extensively so some of it a syntax learning curve but definitely some prefect magic I'm now more aware of. Great library btw - Airflow was certainly a pain which you highlighted well in your most recent blog post.

One more note to others::

task = ShellTask() # this will mess up the use of the @task operator; don't do this
my_task = ShellTask() # much better idea; name it anything else besides `task`
Was this page helpful?
0 / 5 - 0 ratings

Related issues

ponggung picture ponggung  路  3Comments

jameslamb picture jameslamb  路  3Comments

GZangl picture GZangl  路  3Comments

jlowin picture jlowin  路  3Comments

kforti picture kforti  路  3Comments