Azure-sdk-for-python: ADF Question: DatasetDateTimePartitionValue( ) date_property possible values and how to use {year}, {month} placeholders

Created on 1 Nov 2017  ·  11Comments  ·  Source: Azure/azure-sdk-for-python

I've been trying to dynamically change the folder path of an Azure Data Lake Store dataset following the logic in ADF V1 where a variable called SliceStart would let us get the date of when the data slice was being executed.

Now, the use case is similar, however I'd like to get the last time (datetime) an activity was triggered successfully, regardless of this use case, I wanted to first test the dynamic folder path functionality but I have not been able to do so using ADF V2 Python SDN.

I'm using the DatasetDateTimePartitionValue() class , and I'm not sure about some of its parameters, for example the param "date_property", according to the comments in dataset_date_time_partition_value.py , it accepts the name of variable containing date, however I'm not sure of which variables are these, I tried to use SliceStart and the placeholders {year}, {month} and was not successful.

This is how I do it

year = DatasetPartition(name='year', value=DatasetDateTimePartitionValue(date_property='SliceStart', format='yyyy'))
month = DatasetPartition(name='month', value=DatasetDateTimePartitionValue(date_property='SliceStart', format='MM'))
day = DatasetPartition(name='day', value=DatasetDateTimePartitionValue(date_property='SliceStart', format='dd'))

partitioned_by = [year, month, day]

 params_for_dataset = {}

ds_properties = AzureDataLakeStoreDataset(linked_service_name=ls_reference,
                                                  folder_path='ingest/folder_name/{year}-{month}-{day}',
                                                  file_name=file_name,
                                                  format=ds_format,
                                                  structure=schema_structure,
                                                  partitioned_by=partitioned_by,
                                                  parameters=params_for_dataset)

adf_client.datasets.create_or_update(resource_group_name='rg_name',
                                             factory_name='df_name',
                                             properties=ds_properties,
                                             dataset_name='ds_name')

the error I get is that 'ingest/folder_name/{year}-{month}-{day} folder does not exist, I've tried to use expressions as well but nothing, what am I missing.

it'd be nice to see what possible values we can get in date_property and the other classes as well.

some of the references that I've been following:
-https://docs.microsoft.com/en-us/azure/data-factory/control-flow-system-variables
-https://docs.microsoft.com/en-us/azure/data-factory/v1/data-factory-functions-variables (not sure if the same variables exist for V2 at least for activity scope)
-https://docs.microsoft.com/en-us/azure/data-factory/concepts-datasets-linked-services

This is how it was done using ADF V1:
https://blogs.msdn.microsoft.com/data_insights_global_practice/2015/10/19/slicing-and-dicing-configuring-external-data-input-to-azure-data-factory/

Data Factory Service Attention question

All 11 comments

Update:
I'm able to use dynamic paths by using expressions within the folder path parameter, for example

@{formatDateTime(trigger().endTime,'yyyy-MM-dd')}

This is the code

params_for_dataset = {}

ds_properties = AzureDataLakeStoreDataset(linked_service_name=ls_reference,
                                                  folder_path="@{formatDateTime(trigger().endTime,'yyyy-MM-dd')}",
                                                  file_name=file_name,
                                                  format=ds_format,
                                                  structure=schema_structure,
                                                  parameters=params_for_dataset)

adf_client.datasets.create_or_update(resource_group_name='rg_name',
                                             factory_name='df_name',
                                             properties=ds_properties,
                                             dataset_name='ds_name')

However this still does not solve the problem which is an environment variable at the activity/dataset level, there are only pipeline() and trigger() scope variables. I came up a very rudimentary workaround which updates the folder_path every day, however I need to maintain a service that runs this script everyday, if you happen to know the answer, it'd be greatly appreciated.

for now I have created a ticket with the Microsoft ADF team, it's been three days and no answer =), once they get back to me I will provide the answer.

Hi Saul Cruz,

You can declare a parameter named ScheduledTime on both pipeline and dataset.
From trigger, when referencing pipeline, pass value of @trigger().scheduledTime, or some expression of it, as parameter value, like this:
{
"pipelineReference": {
"type": "PipelineReference",
"referenceName": "Pipeline-1"
},
"parameters": {
"ScheduledRunTime": "@addhours(trigger().scheduledTime, -2)"
}
}
From pipeline, in activity, when referencing dataset, pass value of @pipeline().parameters.ScheduledRunTime or expression of it, as parameter value, like this:

            "outputs": [
                { 
                    "referenceName": "Blob_Output",
                    "type": "DatasetReference",
                    "parameters": {
                        "ScheduledRunTime": "@pipeline().parameters.ScheduledRunTime"
                    }
                }
            ]

From dataset, depending on type, use expressions in terms of @dataset().ScheduledRunTime to determine partitioning, here’s an example with a blob:

    "typeProperties": {
        "fileName": {
            "value": "@concat('Data.', formatDateTime(dataset().ScheduledRunTime, 'HH'), '.txt')",
            "type": "Expression"
        },
        "folderPath": {
            "value": "@concat('compliantstore/V2/ADFMds/Billing/year=', formatDateTime(dataset().ScheduledRunTime, 'yyyy'), '/month=', formatDateTime(dataset().ScheduledRunTime, 'MM'), '/day=', formatDateTime(dataset().ScheduledRunTime, 'dd'))",
            "type": "Expression"
        },
        "format": {
            "type": "TextFormat"
        }

And then, the dataset can partition in terms of the trigger’s scheduled time, similar to a V1 slice.

At that point, can query pipeline runs and check both the parameter value with ScheduledRunTime and also the actual run time and whether it succeeded, and also for a specific pipeline run can query activity runs to check individual activity run success and failure.

@kalyan08 thanks for the suggestion, I tried something very similar, however this is only at the trigger or pipeline scope right? from what I remember SliceStart in V1 was using the dataset/activity scope which is very handy as that's the actual level that the folder name depends on , correct me if I'm wrong.

For example, if the activity/dataset ran yesterday but failed, I'd like to pick up where it left and still point to the adls folder of that date, however other activities in the same pipeline might've been successful, these ones should point to the newest/current folder.

I think there are two possible approaches to the scenario we could recommend, both that are easiest to describe with scheduled trigger calling a “master pipeline” with the desired “slice” based on trigger scheduledTime, which uses a custom activity to query monitoring for previous runs of regular pipeline and decides what to do next.

Approach 1: Master pipeline uses custom activity to query monitoring for the immediately previous expected regular pipeline run, with special case for the first run or bootstrap with an initial manual run. If that run succeeded, run the current desired slice of the regular pipeline, otherwise do nothing or maybe stop the trigger. When you have investigated and addressed root cause of failure, you can loop over createRun one at a time to catch up, and restart trigger if it was stopped.

Approach 2: Master pipeline with concurrency 1 understands both calendar and slice parameter for earliest expected regular pipeline run, and uses custom activity to query monitoring starting at first in range missing or failed run, and generates list of all “slice” parameters starting with that and ending with current desired “slice”. Then, master pipeline uses foreach with concurrency 1 sequential execution to attempt to catch up by running regular pipeline with each of those “slice” parameters using executePipeline activity with wait, ensuring that the foreach loop exits immediately if any of those runs fails

@kalyan08 thanks for your response, do you have a simple example ? if not that's fine, i will try to implement a custom activity (I've never tried it before)

what I'm doing now is getting the activity latest metadata using python (get the latest status and get the date if it was successful, otherwise just leave it as it is) and every time i run my python script to update accordingly the dataset path, this works for now however the trade-off is to run this python script in one of my servers.

Hopefully at some point there will be a system variable something like activity().lastSuccessfulRun or at least lastExecutionTime and be able to implement this without a lot of unnecessary logic in your pipelines,

This is what I'm doing for now:


    def get_activity_latest_metadata(self):
        for pipeline_item in adf_client.pipelines.list_by_factory(self.resource_group_name, self.data_factory_name):
            for act_item in pipeline_item.activities:
                result = {}
                act_name_str = str(act_item.name)
                result[act_name_str] = {'last_successful_run': None}
                pl_filter_parameters = PipelineRunFilterParameters(last_updated_after='2017-11-07T12:07:40-05:00',
                                                                   last_updated_before='2017-12-07T12:07:50-05:00')
                pl_runs = self.adf_client.pipeline_runs.query_by_factory(resource_group_name=self.resource_group_name,
                                                                         factory_name=self.data_factory_name,
                                                                         filter_parameters=pl_filter_parameters)
                # Get Metadata of previous runs
                list_of_pl_runs = pl_runs.value
                num_of_pl_runs = len(list_of_pl_runs)
                for pl_run in reversed(list_of_pl_runs):
                    # status = pl_run.status
                    run_id = pl_run.run_id
                    # duration_in_ms = pl_run.duration_in_ms
                    # message = pl_run.message

                    act_runs = self.adf_client.activity_runs.list_by_pipeline_run(rg_name, df_name, run_id,
                                                                                  start_time=pl_run.run_start,
                                                                                  end_time=pl_run.run_end,
                                                                                  activity_name=act_name_str)
                for act in act_runs:
                    # test_act = act
                    act_name = act.activity_name
                    # act_run_id = act.activity_run_id
                    act_status = act.status
                    # act_error = act.error
                    act_run_end = act.activity_run_end
                    if act_status == 'Successful':
                        result[act_name] = {'last_successful_run': act_run_end}
                        break
        return result

Update:

The Microsoft team reached out to me about this issue and stated: “After discussion with PM team, I was communicated that as of now there is no easy work around to achieve this scenario. “

Hi Saulcruz,

Trying to implement same logic in Python but when i am reading the trigger values from your above logic
(@{formatDateTime(trigger().endTime,'yyyy-MM-dd')}) its was not returning any values.
Please let me know if i am missing some things.

Thanks in Advance,
Satya

@hvermis any update on this issue? Thanks!

@Satyadeep-Sinha Please refer to this doc for correct varibale names:
https://docs.microsoft.com/en-us/azure/data-factory/control-flow-system-variables#schedule-trigger-scope
If your trigger is tumbling window trigger, then you need to use:
trigger().outputs.windowEndTime
instead of trigger().endTime

@saulcruz is your main problem to be able to rerun the failed runs? We added new options to create run
https://github.com/Azure/azure-rest-api-specs/blob/master/specification/datafactory/resource-manager/Microsoft.DataFactory/stable/2018-06-01/datafactory.json#L2272
called isRecovery and startActivityName, which will take your parameter values and rerun the your run starting from activity you specified. There is one more property added to the runs record called RunGroupId which groups the runs by assigning them the same group id (the pipeline run ID) if they were reran using isRecovery option.
https://github.com/Azure/azure-rest-api-specs/blob/master/specification/datafactory/resource-manager/Microsoft.DataFactory/stable/2018-06-01/datafactory.json#L4095
his will be released in our next Python SDK release. Does that help you?

@hvermis Thanks. The original use case was to incrementally and dynamically get files from ADLS based on the folder name and a watermark value (last successful date of an Activity Run).

/folder_name/YYYYMMDDTHHmmssZ.txt

I ended up using U-SQL to do this, I pass the watermark (which is stored in activity metadata custom database) value to my U-SQL activity and use the following U-SQL script to extract the files I need;

// temp variables will be replaced by ADF paramaters
// DECLARE @watermark string = "2018-12-06T16:50:31";
// DECLARE @adls_path string = "adl://some-name.azuredatalakestore.net"; 
// End of temp variables 

DECLARE @file_set_path string = @adls_path + "/folder_name/{date_utc:yyyy}{date_utc:MM}{date_utc:dd}T{date_utc:HH}{date_utc:mm}{date_utc:ss}Z.txt";

@data =
    EXTRACT [ID] string,
            [created_at] string,
            [created_by_email] string,
            [created_by_name] string,
            [updated_at] string,
            date_utc DateTime
    FROM @file_set_path
    USING Extractors.Text(delimiter: '\u0001', skipFirstNRows : 1, quoting:false);

@result =
    SELECT [ID] ,
            [created_at] ,
            [created_by_email] ,
            [created_by_name] ,
            [updated_at] ,
           date_utc.ToString("yyyy-MM-ddTHH:mm:ss") AS SourceExtractDateUTC
    FROM @data
    WHERE date_utc > DateTime.ParseExact(@watermark, "yyyy-MM-ddTHH:mm:ss", NULL);

OUTPUT @result TO @adls_path + "/stage/Output.txt" USING Outputters.Text(delimiter: '\u0001', outputHeader:true);

However, those links definitely help for other use cases that we have in mind.
Thanks again.

Was this page helpful?
0 / 5 - 0 ratings