Pipelines: Trouble Moving file from one pipeline to the next

Created on 24 Jul 2019  路  8Comments  路  Source: kubeflow/pipelines

Hi All,
As the title says, I have been struggling to find a way to transport an output file from one pipeline to another pipeline as an input file. I am running Kubeflow v0.5.0 on GCP. I previously tried using the file_outputs argument to pass it, but that was giving me a size error, claiming that the file I tried to output exceeded the byte limit. My python script in the tftload pipeline pulls a 1 GB pickle file from GCP Storage, and converts it eventually to a CSV file, which is about 600 MB in size. I didn鈥檛 find the way to increase 100 MB limit for the outputs. The exact error message to that is below:

This step is in Error state with this message: failed to save outputs: Request entity too large: limit is 104857600

To try to fix the size limit issue, I instead replaced file_outputs with output_artifact_paths, which is shown in the code below. However, the file that I wish to be moved to the second pipeline is not showing up in the latter pipeline's working directory, and hence my python script cannot preprocess the CSV data. Below is the code behind my two pipelines:
tftload = dsl.ContainerOp( name = 'load', image = 'loadingGCRurl:latest', arguments = [ "--input_handle", input_handle_eval, "--outfile_prefix", outfile_prefix_eval, "--working_dir", '%s/%s/tftload' % (working_dir, '{{workflow.name}}'), "--project", project], # file_outputs = {'train_data': '/app/train_data.csv'}, output_artifact_paths={'train-data': '/app/train_data.csv'} ).apply(gcp.use_gcp_secret('user-gcp-sa')) tftpreprocess = dsl.ContainerOp( name = 'preprocess', image = 'preprocessGCRurl:latest', #input_artifact_paths={'train-data': '/app/train_data.csv'}, arguments = [ "--input_handle", input_handle_eval, "--outfile_prefix", outfile_prefix_eval, "--working_dir", '%s/%s/tft-eval' % (working_dir, '{{workflow.name}}'), "--project", project, tftload.output_artifact_paths['train-data'] ], output_artifact_paths={ 'X-train': '/app/X_train.csv', 'X-test': '/app/X_test.csv' } # file_outputs = {'X_train': 'X_train.csv', # 'X_test': 'X_test.csv'} ).apply(gcp.use_gcp_secret('user-gcp-sa')) tftpreprocess.after(tftload)
If there is any advice you can give to help fix this issue, I would greatly appreciate it!

aresdk

All 8 comments

Short answer: Currently the Pipelines system can only help you pass small data around. For bigger outputs, currently, your user code should take output URIs, upload the data there and then output those URIs, so that the next step can use them.
Please read https://www.kubeflow.org/docs/pipelines/sdk/component-development/

tftload.output_artifact_paths['train-data']
This is not valid. We should probably take output_artifact_paths private to avoid confusion.

I instead replaced file_outputs with output_artifact_paths, which is shown in the code below. However, the file that I wish to be moved to the second pipeline is not showing up in the latter pipeline's working directory, and hence my python script cannot preprocess the CSV data.

output_artifact_paths can handle big data. Unfortunately, the corresponding ability to declare the input artifacts and to pass the artifacts is missing. Pending PR: https://github.com/kubeflow/pipelines/pull/791

@Ark-kun
I have followed the tutorial of passing larger data, using OutputPath and OutputFile I still recevie the 'This step is in Error state with this message: failed to save outputs: Request entity too large: limit is 3145728'

I have KubeFlow latest release deployed on GCP using CLI following the instructions in the official website.

Is there a way to overcome this?

I still recevie the 'This step is in Error state with this message: failed to save outputs: Request entity too large: limit is 3145728'

Are you just running the unmodified tutorial notebook?

using OutputPath and OutputFile
The important part is using InputPath. The passing method depends on the way the data is consumed. I guess this is a non-trivial implementation detail.

What about passing large data items with Reusable components (kfp.dsl.ContainerOp with file_outputs arg)? The notebook tutorial only shows the example for lightweights components.

What about passing large data items with Reusable components (kfp.dsl.ContainerOp with file_outputs arg)? The notebook tutorial only shows the example for lightweights components.

Please check this tutorial for non-lightweight components:
https://github.com/Ark-kun/kfp_samples/tree/master/2019-10%20Kubeflow%20summit/106%20-%20Creating%20components%20from%20command-line%20programs

Hi @Ark-kun
I am using ContainerOp to write and read TFRecords. I am using file_outputs to define the paths to be consumed by the next component.

(Note: this is only part of my code. If you need to see more let me know.)

def preprocess_op(data_path: OutputPath(str)):
    return dsl.ContainerOp( 
        name = 'preprocess_data',
        image = 'us.gcr.io/manceps-labs/preprocess:v0.2.0',
        command = 'python3',
        arguments = ['preprocess.py',
                     '--path', data_path],
        file_outputs = {
            'training_data': '/tmp/train_data_0.tfrecord',
            'test_data': /tmp/test_data_0.tfrecord',
            'prediction_data': /tmp/pred_data.tfrecord',
            'test_targets': /tmp/test_target_stats.csv'
        }
    )

@dsl.pipeline(
    name = 'Predict Closing Price',
    description = 'Predict the closing price of a stock with at least 20 years of historical data.'
)

def predict_stock(

):
    preprocess_data = preprocess_op(DATA_PATH)

    train_model = train_op(DATA_PATH,
                        preprocess_data.outputs['training_data'],
                        preprocess_data.outputs['test_data']

When I run the pipeline I get the error 'This step is in Error state with this message: failed to save outputs: Request entity too large: limit is 3145728'. I am running my pipeline on Microk8s. I definitely want to use ContainerOp as I wish to pull my images from GCP. Any advice on how I might be able to get around this error?

Hey @lildonpancho

I am playing around with a similar workflow and ran into the same issue. I think I just figured it out. Change your code up similar to the following and it should work.

   training_data = dsl.InputArgumentPath(preprocess_data.outputs['training_data'])
   test_data = dsl.InputArgumentPath(preprocess_data.outputs['test_data'])
   train_model = train_op(DATA_PATH,
                        training_data,
                        test_data)
Was this page helpful?
0 / 5 - 0 ratings