Pipelines: Parse KFP read_artifact data

Created on 5 Aug 2020  路  14Comments  路  Source: kubeflow/pipelines

I am trying to use the read_artifact method in kfp.Client().runs. However, I am not sure how to parse the returned data.

If I open the link provided by the UI, I am able to view the data as:
["[0, 1, 2, 3, 4, 5, 6, 7]", "[8, 9, 10, 11, 12, 13, 14, 15]", "[16, 17, 18, 19, 20, 21, 22, 23]", "[24, 25, 26, 27, 28, 29, 30, 31]", "[32, 33, 34, 35, 36, 37, 38, 39]", "[40, 41, 42, 43, 44, 45, 46, 47]", "[48, 49, 50, 51, 52, 53, 54, 55]", "[56, 57, 58, 59, 60, 61, 62, 63]", "[64, 65, 66, 67, 68, 69, 70, 71]", "[72, 73, 74]"]

However, the response returned by read_artifact is:
{'data': 'H4sIAAAAAAAA/+yRscrcQAwG/SjD1V8haSWt/SzHFQfpf/hzef/g2NVfJKQ4QsDTqNhhGe1+e76ey5sxM+vMX9PMvk4rj8XH9EhPL1vMfVou2LvDdn58fz0/F7PPj4/X77w/nX9d7j/hfrubcBFiiBQlWszHTdzuq9iE74oLD+FDeAqvw/AWPoWvwjcRJmK/L0SMw4kUUSJaxBSxitjEMDH8cMYeMMRIMUqMFmOKsYqxHU6aSBcZIvfaFFkiW+RZnKvITZSJclEhaojaVzubq0VNUauoTbSJdtEh+mzuFL2/RYueolfRm5gm5tk8Q8whZj5uj3/9kxcXFxd/x88AAAD//+E0btIACAAA'}, where the return type of artifact.data is a string, although it is supposed to be bytes. How do I get the original data back?

I have also tried using wget with the link returned by the UI, but when I tar -xzvf the file, it gives me this error:

tar: Error opening archive: Unrecognized archive format

Version: kfp 1.0.0, kubeflow 1.0.2

arebackend aresdclient lifecyclstale

Most helpful comment

@kxiao-fn (or anyone else stumbling on this), here's some code that I wrote to do this:

#!/usr/bin/env python3

import json
import tarfile
from base64 import b64decode
from io import BytesIO

import kfp


def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
    run = client.runs.get_run(run_id)
    workflow = json.loads(run.pipeline_runtime.workflow_manifest)
    nodes = workflow["status"]["nodes"]
    for node_id, node_info in nodes.items():
        if node_info["displayName"] == component_name:
            return node_id
    else:
        raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")


def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
    artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
    # Artifacts are returned as base64-encoded .tar.gz strings
    data = b64decode(artifact.data)
    io_buffer = BytesIO()
    io_buffer.write(data)
    io_buffer.seek(0)
    data = None
    with tarfile.open(fileobj=io_buffer) as tar:
        member_names = tar.getnames()
        if len(member_names) == 1:
            data = tar.extractfile(member_names[0]).read().decode('utf-8')
        else:
            # Is it possible for KFP artifacts to have multiple members?
            data = {}
            for member_name in member_names:
                data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
    return data


if __name__ == "__main__":
    run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
    component_name = "my-component"
    # For an output variable named "output_data"
    artifact_name = "my-component-output_data"

    client = kfp.Client()
    node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
    artifact = get_artifact(
        run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
    )
    # Do something with artifact ...

All 14 comments

where the return type of artifact.data is a string, although it is supposed to be bytes. How do I get the original data back?

Have you tried decoding using Base64 and then unzipping/untarring?

I have also tried using wget with the link returned by the UI

Have you tried clicking on that link? I think it returns the actual (unarchived) artifact data.

How do you decode it when it is returned as a string? In the documentation, the data is defined to be "The bytes of the artifact content" although the return type is "str"

Also, I have clicked the link, but it just opens the link in the browser and displays the contents. I would like to download the contents instead (especially because I have some output artifacts that are pickled files, so displaying it in the UI would not suffice).

You can save the link as a local file I think

I tried that, but I received the error tar: Error opening archive: Unrecognized archive format when trying to unzip it.

How do you decode it when it is returned as a string?

Base64 encodes bytes to string.
Base64 decodes string to bytes.

@kxiao-fn (or anyone else stumbling on this), here's some code that I wrote to do this:

#!/usr/bin/env python3

import json
import tarfile
from base64 import b64decode
from io import BytesIO

import kfp


def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
    run = client.runs.get_run(run_id)
    workflow = json.loads(run.pipeline_runtime.workflow_manifest)
    nodes = workflow["status"]["nodes"]
    for node_id, node_info in nodes.items():
        if node_info["displayName"] == component_name:
            return node_id
    else:
        raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")


def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
    artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
    # Artifacts are returned as base64-encoded .tar.gz strings
    data = b64decode(artifact.data)
    io_buffer = BytesIO()
    io_buffer.write(data)
    io_buffer.seek(0)
    data = None
    with tarfile.open(fileobj=io_buffer) as tar:
        member_names = tar.getnames()
        if len(member_names) == 1:
            data = tar.extractfile(member_names[0]).read().decode('utf-8')
        else:
            # Is it possible for KFP artifacts to have multiple members?
            data = {}
            for member_name in member_names:
                data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
    return data


if __name__ == "__main__":
    run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
    component_name = "my-component"
    # For an output variable named "output_data"
    artifact_name = "my-component-output_data"

    client = kfp.Client()
    node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
    artifact = get_artifact(
        run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
    )
    # Do something with artifact ...

Hi @sm-hawkfish that looks great!
Are you interested contributing it?

/cc @neuromage

To make sure I understand the context correctly, we're talking about the Argo artifact instead of MLMD artifact here, right?

Yes, it's argo artifact

@Bobgy, I am glad you think it's useful! Yes, I would be interested in contributing it -- do you have any thoughts on where it should be added (and whether it should also be added to the CLI), or should I browse the SDK?

@sm-hawkfish Great!

I think kfp.Client is a good place, it will be up-to-you whether you think adding it to CLI is useful too. Of course you can implement in multiple PRs for sure.

Hi @Bobgy -- I wanted to follow up on this. I just got approved by my employer to contribute this repo, which is exciting news. I will work on submitting a Pull Request with some variation of the above code snippet sometime this week.

@sm-hawkfish That's awesome!

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

Was this page helpful?
0 / 5 - 0 ratings