USE CASE:
I am trying to work on the same version of the flow I have uploaded to my S3 bucket. The issue I found is storage.flows() is empty and does not find my existing flow from S3 bucket.
If I explicitly pass the flow name then I can access my flow through get_flow method.Else I get an error as "Flow not contained in the storage".
Please let me know if I am missing anything.
dictFlows={'flows': {'ETL': 'etl/testflow'}, 'flow_location': 'etl/testflow'}
def test_add_flow_to_S3():
storage = S3(bucket="test",key="etl/testflow")
f = Flow("ETL")
f.name not in storage
with Flow("ETL") as f:
e = extract()
t = transform(e)
l = load(t)
flow_location=storage.add_flow(f)
f.name in storage
storage.build()def test_get_flow_S3(dictFlows):
print("i am in get flow")
storage = S3(bucket="test", key="etl/testflow")
storage.flows=dictFlows['flows']
newflow=storage.get_flow('etl/testflow')
print("S3 FLOW OUTPUT")
newflow.run() (edited)
NOTE : I did chat about the above issue in the prefect slack channel and it looks like a bug.
Hi @mithalee this isn't actually bug a but working as it is designed. The S3 stored works fine when used in a flow run but I see here how one of its checks can be unhelpful when working with storage options locally or in tests. Essentially what is happening is it has a check prior to retrieving the flow to verify that the flow object is attached to that storage object. As the patterns have evolved over time this isn't needed anymore because if the flow is not found when downloading from S3 then it will achieve the same purpose. Thanks for opening!
@joshmeek
def get_flow(self,flow_location: str = None):
if flow_location:
if flow_location not in self.flows.values():
# we only do this check when add_flow() has been called
raise ValueError("Flow is not contained in this Storage")
elif self.key:
flow_location = self.key
else:
raise Exception("No flow location provided")
I think it's fair to assume that if the S3 Storage object is initialized to represent a full S3 object ie bucket & key, the caller knows exactly which flow they want to get.
my_s3_obj = S3(bucket=my_bucket,key=flow_key)
retrieved_flow = my_s3_obj.get_flow()
But if flow_location is provided as an argument we can give it precedence so there is no breaking behavior with existing workflows where add_flow() is called before get_flow()
my_s3_obj = S3(bucket=my_bucket,key=flow_key) # or S3(bucket=my_bucket)
new_flow = Flow("Fancy Flow")
new_loc = my_s3_object.add_flow(new_flow)
my_s3_obj.get_flow(new_loc)
@berosen Yeah I like that proposal! Best of both worlds
Yea..looks like the way I tried is a good way to go then..Thank you guys.
@joshmeek would you mind assigning this issue to me? I can put together a PR for it.
Yeah of course! @berosen
Closed by #3030
Most helpful comment
@berosen Yeah I like that proposal! Best of both worlds