Apache Airflow version: 1.10.4
Environment:
uname -a): Linux 9f23d34e2f14 4.19.76-linuxkit #1 SMP Tue May 26 11:42:35 UTC 2020 x86_64 GNU/LinuxWhat happened: Installed backport providers amazon package version 2020.6.24 and also 2020.10.29 (latest). The AWSBatchOperator does not support Batch parameter option to replace placeholder variables in the Batch command. In Airflow Github 1.10 source branch, the code shows that it supports using Batch parameters with AWS batch. I looked at the installed module source of the provider package and confirmed that the installed code is out of sync with Airflow Github 1.10 branch source.
What you expected to happen: I expected the AWSBatchOperator to use Batch parameters value and send it to AWS so that the parameter values can replace place holder variables in the Batch command.
How to reproduce it: Create a AWS Batch Job queue, definition, etc. In the Job definition, leave the Command and Parameters empty. In the DAG, include AWSBatchOperator and specify the overrides to include the command with place holder variables and parameters to replace them with values. For eg:
aws_job_submission = AWSBatchOperator(
task_id='aws-batch-job-submission',
dag=dag,
aws_conn_id='batch_dev',
job_name='airflow-job-submission-and-run-' + datetime.today().strftime('%Y-%m-%d'),
job_definition='testwithparams',
job_queue='dev-test-queue',
overrides={'command': ['-b','Ref::bucket','-v','Ref::testfile','-r','Ref::serfile']},
parameters={
"bucket": "mybucket",
"testfile": "testfile.txt",
"serfile": "testref.ser"
}
)
This problem occurs every time.
Here is the source code of the AWSBatchOperator constructor and execute method from the installed module. It is evident that parameters option is not supported.
@apply_defaults
def __init__(self, job_name, job_definition, job_queue, overrides, max_retries=4200,
aws_conn_id=None, region_name=None, **kwargs):
super(AWSBatchOperator, self).__init__(**kwargs)
self.job_name = job_name
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.job_definition = job_definition
self.job_queue = job_queue
self.overrides = overrides
self.max_retries = max_retries
self.jobId = None
self.jobName = None
self.hook = self.get_hook()
def execute(self, context):
self.log.info(
'Running AWS Batch Job - Job definition: %s - on queue %s',
self.job_definition, self.job_queue
)
self.log.info('AWSBatchOperator overrides: %s', self.overrides)
self.client = self.hook.get_client_type(
'batch',
region_name=self.region_name
)
try:
response = self.client.submit_job(
jobName=self.job_name,
jobQueue=self.job_queue,
jobDefinition=self.job_definition,
containerOverrides=self.overrides)
self.log.info('AWS Batch Job started: %s', response)
self.jobId = response['jobId']
self.jobName = response['jobName']
self._wait_for_task_ended()
self._check_success_task()
self.log.info('AWS Batch Job has been successfully executed: %s', response)
except Exception as e:
self.log.info('AWS Batch Job has failed executed')
raise AirflowException(e)
Thanks for opening your first issue here! Be sure to follow the issue template!
backport packages are based on Airflow 2.0 (master branch). You should be looking at a different branch:
https://github.com/apache/airflow/blob/master/airflow/providers/amazon/aws/operators/batch.py#L37
When I installed the backport providers amazon package, I had to import from airflow.contrib.operators.awsbatch_operator. The link on https://pypi.org/project/apache-airflow-backport-providers-amazon/ for Airflow 1.10 points to https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/operators/awsbatch_operator.py. The operator source code in Github 1-10-stable branch is using the Batch parameters correctly. But the package is not. When I inspect the source code of the operator in the package, it does not use it at all and it does not match v1-10-stable. Looks like the package is way behind and does not have the v1-10-stable code. Shouldn't the package reflect the branch source code?
The operator on master branch has the same interface:
https://github.com/apache/airflow/blob/980c7252c0f28c251e9f87d736cd88d6027f3da3/airflow/providers/amazon/aws/operators/batch.py#L101-L116
as the one from v1-10-stable:
But the code you posted comes from 1.10.4 version of the operators:
https://github.com/apache/airflow/blob/a18283911750017a01d538bcdd48283b6a265bfb/airflow/contrib/operators/awsbatch_operator.py#L66-L69
To use operators from back port providers you need to import from airflow.providers.amazon.aws...
You should import operators from airflow.providers.amazon...
When I installed the backport providers amazon package, I had to import from airflow.contrib.operators.awsbatch_operator. The link on https://pypi.org/project/apache-airflow-backport-providers-amazon/ for Airflow 1.10 points to https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/operators/awsbatch_operator.py. The operator source code in Github 1-10-stable branch is using the Batch parameters correctly. But the package is not. When I inspect the source code of the operator in the package, it does not use it at all and it does not match v1-10-stable. Looks like the package is way behind and does not have the v1-10-stable code. Shouldn't the package reflect the branch source code?
PLease read the summary in https://pypi.org/project/apache-airflow-backport-providers-amazon/
In Airflow 2.0, all operators, transfers, hooks, sensors, secrets for the amazon provider are in the airflow.providers.amazon package. You can read more about the naming conventions used in Naming conventions for provider packages
Then in the table of "Moved operators" you have:
Airflow 2.0 operators:聽airflow.providers.amazon聽package | Airflow 1.10.* previous location (usually聽airflow.contrib)
So bottom line - as @turbaszek and @mik-laj tried to explain helpfully - backport provider provides "airflow.providers". classes and they should be used. They do not modify the old "contrib" classes.
So everything is as expected. I hope it explains it, if this is still unclear and you would like to improve the description. I invite you to describe it in the way that will be easier to understand for new user. We tried to explain it well, but maybe we made some assumption that new users do not grasp easily. I would really love to improve the description.
Here is the template we are using: https://github.com/apache/airflow/blob/master/provider_packages/BACKPORT_PROVIDER_README_TEMPLATE.md.jinja2 - happy to approve your PR there.
For who ever stumbles across this:
I was tripped up by the case sensitivity. I was importing AWSBatchOperator. Instead I should be importing AwsBatchOperator.
Thanks for the help.
Most helpful comment
For who ever stumbles across this:
I was tripped up by the case sensitivity. I was importing AWSBatchOperator. Instead I should be importing AwsBatchOperator.
Thanks for the help.