Hi all,
This is a cross posting of discussion #3304. This implementation will allow for easier sharing of python workflow libraries that are installed via pip/conda in any type of environment, including LocalEnvironment.
LocalModule will serve just like Local with stored_as_script=True, where instead of a path='/path/to/my/flows/exampleflow.py', you could use module_path='from myrepo.flows import exampleflow' or 'myrepo.flows.exampleflow'. Thus the flow is accessible in any environment where that module import works, not just the environments that share the same path structure.
In an example workflow library, workflows are separated into scripts and can thus be imported and used. One can easily do the following psuedo code...
from myrepo.workflows import exampleflow
exampleflow.flow.register(...)
or alternatively... While I import the entire script above, it may be easier to simply import the single instance (the flow variable).
from myrepo.workflows.exampleflow import flow
flow.register(...)
where the exampleflow.py looks like...
import prefect
from prefect import task, Flow
@task
def hello_task():
logger = prefect.context.get("logger")
logger.info("Hello, Cloud!")
flow = Flow("hello-flow", tasks=[hello_task])
from prefect.environments.storage import LocalModule
flow.storage = LocalModule(module_path=__name__) # __name__ will give 'myrepo.workflows.exampleflow'
Just to start with implementation, LocalModule should be pretty simple to implement in terms of serialization and dynamic loading.
The serialization of a module is simply saving the path and version:
info = {'module': 'myrepo.workflows.exampleflow',
'version': 'v0.0.1',
'class_str': 'flow', # (optional) this is what I made my variable name for my Flow instance above
}
And dynamic loading can be done with...
module_path = info['module'] # grab the string from the serialized info
class_str = info['class_str'] # grab the name of the variable for Flow instance
# (optional) some version check -- maybe warn or raise error if there is a difference...?
# and now use the string to import
import importlib
module = importlib.import_module(module_path)
# and if the Flow is saved as a variable instance named 'flow', we can grab it using
flow = getattr(module, class_str)
Hi @jacksund, the above design makes sense to me. Is this something you'd be interested in working on?
Sure thing! I'll use the Local storage class as a starting point and give this an attempt.
I wrote a mock version of the LocalModule class and I've attached my script below _(it's without version checking and only supports stored_as_script=True for the moment)_. I could use some feedback on the best way to test this though. Will I be able to register a flow with this custom storage on the Prefect Cloud? My best guess is to simply try it out with the following code...
from myrepo.workflows.myflow import flow
from mycustomcode.storage import LocalModule
flow.storage = LocalModule(module_path='myrepo.workflows.myflow')
flow_id = flow.register(project_name="Hello, World!")
When I try this, I get an Error, so I'm not sure where I'm going wrong.
ValueError: Flow could not be deserialized successfully. Error was: TypeError('not all arguments converted during string formatting')
I'm trying to dig further into the error. The issue is coming from when I try to serialize the LocalModule storage class. When I run storage.serialize(build=True) and look at the result, I can see the storage serialization failed and gave: ('storage', (None, {'_schema': 'Unsupported object type: LocalModule'})). I had my LocalModule class inherit from 'prefect.environments.storage.Storage' so I'm not sure why this is the case.
A few comments:
Sorry about that. I hadn't made a gist before. Here's the code as a gist: link.
And I'm using Cloud, so that's the problem... I haven't tried Prefect Server yet, but I'll give it a go. Thank you!
And just to be clear @jacksund - if we accepted a new storage class, we would make sure it became Cloud compatible! For reference, here is another example of a user-contributed storage class that is now functional with Prefect Cloud: https://github.com/PrefectHQ/prefect/pull/3000
Server is useful for development purposes though 馃憤
Thanks for the link! I'll definitely need some help with the Cloud when we get there too.
For now, I have a bare-bones version working with Prefect Server in development mode along with a LocalAgent!! I could use some feedback too.
I've uploaded the forked repo on my profile. Sorry but I'm new to this and not sure if I'm sharing correctly. Should I being giving more than just a link here? If that's all you need, here you go: localmod-storage
When you compare this code to the master branch, it worth noting that I haven't finished the test_**.py files so there are a number of commented out lines there.
If you want to open a draft PR, that might be easier for providing commentary on your implementation line-by-line
I opened up a PR request for it. Thanks for taking a look 馃憤
I won't be able to continue this until next week (specifically until the 28th), but feel free to add comments to my changed files! If you'd like me to just add more comments to the code or clear up meaning in the doc strings, I can do that as well.
Closed by #3351
Most helpful comment
Sure thing! I'll use the Local storage class as a starting point and give this an attempt.