Repository: incubator-airflow
Updated Branches:
  refs/heads/master c5f663387 -> c65f403a5


[AIRFLOW-589] Add templatable job_name[]

The jobname is the name that will appear in the
DataProc web console.
It's helpfull to have a one-to-one mapping between
the airflow task and
the job running on the cluster. Adding a templated
parameter will allow
you to customize how airflow will construct the
jobname.

The default is to add the {{task_id}} +
{{ds_nodash}} + random hash.

Closes #1847 from alexvanboxel/feature/airflow-589
-dataproc-templated-job-name


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c65f403a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c65f403a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c65f403a

Branch: refs/heads/master
Commit: c65f403a532941136aa62fa978f18ba82ce0ae7d
Parents: c5f6633
Author: Alex Van Boxel <a...@vanboxel.be>
Authored: Mon Oct 24 07:46:47 2016 -0700
Committer: Chris Riccomini <chr...@wepay.com>
Committed: Mon Oct 24 07:46:53 2016 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataproc_hook.py     |   3 +
 airflow/contrib/operators/dataproc_operator.py | 106 ++++++++++++++------
 2 files changed, 77 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c65f403a/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py 
b/airflow/contrib/hooks/gcp_dataproc_hook.py
index fab71c5..e77c951 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -126,6 +126,9 @@ class _DataProcJobBuilder:
     def set_python_main(self, main):
         self.job["job"][self.job_type]["mainPythonFileUri"] = main
 
+    def set_job_name(self, name):
+        self.job["job"]["reference"]["jobId"] = name + "_" + 
str(uuid.uuid1())[:8]
+
     def build(self):
         return self.job
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c65f403a/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py 
b/airflow/contrib/operators/dataproc_operator.py
index 33e5f79..2955d26 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -43,11 +43,11 @@ class DataProcPigOperator(BaseOperator):
     t1 = DataProcPigOperator(
         task_id='dataproc_pig',
         query='a_pig_script.pig',
-        variables={'out': 'gs://example/output/{ds}'},
+        variables={'out': 'gs://example/output/{{ds}}'},
     dag=dag)
     ```
     """
-    template_fields = ['query', 'variables']
+    template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster']
     template_ext = ('.pg', '.pig',)
     ui_color = '#0273d4'
 
@@ -56,6 +56,7 @@ class DataProcPigOperator(BaseOperator):
             self,
             query,
             variables=None,
+            job_name='{{task.task_id}}_{{ds_nodash}}',
             dataproc_cluster='cluster-1',
             dataproc_pig_properties=None,
             dataproc_pig_jars=None,
@@ -74,6 +75,10 @@ class DataProcPigOperator(BaseOperator):
         :type query: string
         :param variables: Map of named parameters for the query.
         :type variables: dict
+        :param job_name: The job name used in the DataProc cluster. This name 
by default
+            is the task_id appended with the execution data, but can be 
templated. The
+            name will always be appended with a random number to avoid name 
clashes.
+        :type job_name: string
         :param dataproc_cluster: The id of the DataProc cluster.
         :type dataproc_cluster: string
         :param dataproc_pig_properties: Map for the Pig properties. Ideal to 
put in
@@ -94,6 +99,7 @@ class DataProcPigOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.query = query
         self.variables = variables
+        self.job_name = job_name
         self.dataproc_cluster = dataproc_cluster
         self.dataproc_properties = dataproc_pig_properties
         self.dataproc_jars = dataproc_pig_jars
@@ -107,6 +113,7 @@ class DataProcPigOperator(BaseOperator):
         job.add_query(self.query)
         job.add_variables(self.variables)
         job.add_jar_file_uris(self.dataproc_jars)
+        job.set_job_name(self.job_name)
 
         hook.submit(hook.project_id, job.build())
 
@@ -115,7 +122,7 @@ class DataProcHiveOperator(BaseOperator):
     """
     Start a Hive query Job on a Cloud DataProc cluster.
     """
-    template_fields = ['query', 'variables']
+    template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster']
     template_ext = ('.q',)
     ui_color = '#0273d4'
 
@@ -124,6 +131,7 @@ class DataProcHiveOperator(BaseOperator):
             self,
             query,
             variables=None,
+            job_name='{{task.task_id}}_{{ds_nodash}}',
             dataproc_cluster='cluster-1',
             dataproc_hive_properties=None,
             dataproc_hive_jars=None,
@@ -138,6 +146,10 @@ class DataProcHiveOperator(BaseOperator):
         :type query: string
         :param variables: Map of named parameters for the query.
         :type variables: dict
+        :param job_name: The job name used in the DataProc cluster. This name 
by default
+            is the task_id appended with the execution data, but can be 
templated. The
+            name will always be appended with a random number to avoid name 
clashes.
+        :type job_name: string
         :param dataproc_cluster: The id of the DataProc cluster.
         :type dataproc_cluster: string
         :param dataproc_hive_properties: Map for the Pig properties. Ideal to 
put in
@@ -158,6 +170,7 @@ class DataProcHiveOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.query = query
         self.variables = variables
+        self.job_name = job_name
         self.dataproc_cluster = dataproc_cluster
         self.dataproc_properties = dataproc_hive_properties
         self.dataproc_jars = dataproc_hive_jars
@@ -172,6 +185,7 @@ class DataProcHiveOperator(BaseOperator):
         job.add_query(self.query)
         job.add_variables(self.variables)
         job.add_jar_file_uris(self.dataproc_jars)
+        job.set_job_name(self.job_name)
 
         hook.submit(hook.project_id, job.build())
 
@@ -180,7 +194,7 @@ class DataProcSparkSqlOperator(BaseOperator):
     """
     Start a Spark SQL query Job on a Cloud DataProc cluster.
     """
-    template_fields = ['query', 'variables']
+    template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster']
     template_ext = ('.q',)
     ui_color = '#0273d4'
 
@@ -189,6 +203,7 @@ class DataProcSparkSqlOperator(BaseOperator):
             self,
             query,
             variables=None,
+            job_name='{{task.task_id}}_{{ds_nodash}}',
             dataproc_cluster='cluster-1',
             dataproc_spark_properties=None,
             dataproc_spark_jars=None,
@@ -203,6 +218,10 @@ class DataProcSparkSqlOperator(BaseOperator):
         :type query: string
         :param variables: Map of named parameters for the query.
         :type variables: dict
+        :param job_name: The job name used in the DataProc cluster. This name 
by default
+            is the task_id appended with the execution data, but can be 
templated. The
+            name will always be appended with a random number to avoid name 
clashes.
+        :type job_name: string
         :param dataproc_cluster: The id of the DataProc cluster.
         :type dataproc_cluster: string
         :param dataproc_spark_properties: Map for the Pig properties. Ideal to 
put in
@@ -223,6 +242,7 @@ class DataProcSparkSqlOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.query = query
         self.variables = variables
+        self.job_name = job_name
         self.dataproc_cluster = dataproc_cluster
         self.dataproc_properties = dataproc_spark_properties
         self.dataproc_jars = dataproc_spark_jars
@@ -237,6 +257,7 @@ class DataProcSparkSqlOperator(BaseOperator):
         job.add_query(self.query)
         job.add_variables(self.variables)
         job.add_jar_file_uris(self.dataproc_jars)
+        job.set_job_name(self.job_name)
 
         hook.submit(hook.project_id, job.build())
 
@@ -246,7 +267,7 @@ class DataProcSparkOperator(BaseOperator):
     Start a Spark Job on a Cloud DataProc cluster.
     """
 
-    template_fields = ['arguments']
+    template_fields = ['arguments', 'job_name', 'dataproc_cluster']
     ui_color = '#0273d4'
 
     @apply_defaults
@@ -257,6 +278,7 @@ class DataProcSparkOperator(BaseOperator):
             arguments=None,
             archives=None,
             files=None,
+            job_name='{{task.task_id}}_{{ds_nodash}}',
             dataproc_cluster='cluster-1',
             dataproc_spark_properties=None,
             dataproc_spark_jars=None,
@@ -280,6 +302,10 @@ class DataProcSparkOperator(BaseOperator):
         :type archives: list
         :param files: List of files to be copied to the working directory
         :type files: list
+        :param job_name: The job name used in the DataProc cluster. This name 
by default
+            is the task_id appended with the execution data, but can be 
templated. The
+            name will always be appended with a random number to avoid name 
clashes.
+        :type job_name: string
         :param dataproc_cluster: The id of the DataProc cluster.
         :type dataproc_cluster: string
         :param dataproc_spark_properties: Map for the Pig properties. Ideal to 
put in
@@ -303,6 +329,7 @@ class DataProcSparkOperator(BaseOperator):
         self.arguments = arguments
         self.archives = archives
         self.files = files
+        self.job_name = job_name
         self.dataproc_cluster = dataproc_cluster
         self.dataproc_properties = dataproc_spark_properties
         self.dataproc_jars = dataproc_spark_jars
@@ -318,6 +345,7 @@ class DataProcSparkOperator(BaseOperator):
         job.add_jar_file_uris(self.dataproc_jars)
         job.add_archive_uris(self.archives)
         job.add_file_uris(self.files)
+        job.set_job_name(self.job_name)
 
         hook.submit(hook.project_id, job.build())
 
@@ -327,7 +355,7 @@ class DataProcHadoopOperator(BaseOperator):
     Start a Hadoop Job on a Cloud DataProc cluster.
     """
 
-    template_fields = ['arguments']
+    template_fields = ['arguments', 'job_name', 'dataproc_cluster']
     ui_color = '#0273d4'
 
     @apply_defaults
@@ -338,6 +366,7 @@ class DataProcHadoopOperator(BaseOperator):
             arguments=None,
             archives=None,
             files=None,
+            job_name='{{task.task_id}}_{{ds_nodash}}',
             dataproc_cluster='cluster-1',
             dataproc_hadoop_properties=None,
             dataproc_hadoop_jars=None,
@@ -361,6 +390,10 @@ class DataProcHadoopOperator(BaseOperator):
         :type archives: list
         :param files: List of files to be copied to the working directory
         :type files: list
+        :param job_name: The job name used in the DataProc cluster. This name 
by default
+            is the task_id appended with the execution data, but can be 
templated. The
+            name will always be appended with a random number to avoid name 
clashes.
+        :type job_name: string
         :param dataproc_cluster: The id of the DataProc cluster.
         :type dataproc_cluster: string
         :param dataproc_hadoop_properties: Map for the Pig properties. Ideal 
to put in
@@ -384,6 +417,7 @@ class DataProcHadoopOperator(BaseOperator):
         self.arguments = arguments
         self.archives = archives
         self.files = files
+        self.job_name = job_name
         self.dataproc_cluster = dataproc_cluster
         self.dataproc_properties = dataproc_hadoop_properties
         self.dataproc_jars = dataproc_hadoop_jars
@@ -399,6 +433,7 @@ class DataProcHadoopOperator(BaseOperator):
         job.add_jar_file_uris(self.dataproc_jars)
         job.add_archive_uris(self.archives)
         job.add_file_uris(self.files)
+        job.set_job_name(self.job_name)
 
         hook.submit(hook.project_id, job.build())
 
@@ -408,7 +443,7 @@ class DataProcPySparkOperator(BaseOperator):
     Start a PySpark Job on a Cloud DataProc cluster.
     """
 
-    template_fields = ['arguments']
+    template_fields = ['arguments', 'job_name', 'dataproc_cluster']
     ui_color = '#0273d4'
 
     @apply_defaults
@@ -419,6 +454,7 @@ class DataProcPySparkOperator(BaseOperator):
             archives=None,
             pyfiles=None,
             files=None,
+            job_name='{{task.task_id}}_{{ds_nodash}}',
             dataproc_cluster='cluster-1',
             dataproc_pyspark_properties=None,
             dataproc_pyspark_jars=None,
@@ -427,35 +463,39 @@ class DataProcPySparkOperator(BaseOperator):
             *args,
             **kwargs):
         """
-         Create a new DataProcPySparkOperator.
+        Create a new DataProcPySparkOperator.
 
-         :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI 
of the main
+        :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of 
the main
             Python file to use as the driver. Must be a .py file.
-         :type main: string
-         :param arguments: Arguments for the job.
-         :type arguments: list
-         :param archives: List of archived files that will be unpacked in the 
work
+        :type main: string
+        :param arguments: Arguments for the job.
+        :type arguments: list
+        :param archives: List of archived files that will be unpacked in the 
work
             directory. Should be stored in Cloud Storage.
-         :type archives: list
-         :param files: List of files to be copied to the working directory
-         :type files: list
-         :param pyfiles: List of Python files to pass to the PySpark framework.
+        :type archives: list
+        :param files: List of files to be copied to the working directory
+        :type files: list
+        :param pyfiles: List of Python files to pass to the PySpark framework.
             Supported file types: .py, .egg, and .zip
-         :type pyfiles: list
-         :param dataproc_cluster: The id of the DataProc cluster.
-         :type dataproc_cluster: string
-         :param dataproc_pyspark_properties: Map for the Pig properties. Ideal 
to put in
-             default arguments
-         :type dataproc_pyspark_properties: dict
-         :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud 
Storage (example:
-             for UDFs and libs) and are ideal to put in default arguments.
-         :type dataproc_pyspark_jars: list
-         :param gcp_conn_id: The connection ID to use connecting to Google 
Cloud Platform.
-         :type gcp_conn_id: string
-         :param delegate_to: The account to impersonate, if any.
-             For this to work, the service account making the request must have
-             domain-wide delegation enabled.
-         :type delegate_to: string
+        :type pyfiles: list
+        :param job_name: The job name used in the DataProc cluster. This name 
by default
+            is the task_id appended with the execution data, but can be 
templated. The
+            name will always be appended with a random number to avoid name 
clashes.
+        :type job_name: string
+        :param dataproc_cluster: The id of the DataProc cluster.
+        :type dataproc_cluster: string
+        :param dataproc_pyspark_properties: Map for the Pig properties. Ideal 
to put in
+            default arguments
+        :type dataproc_pyspark_properties: dict
+        :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud 
Storage (example:
+            for UDFs and libs) and are ideal to put in default arguments.
+        :type dataproc_pyspark_jars: list
+        :param gcp_conn_id: The connection ID to use connecting to Google 
Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have
+            domain-wide delegation enabled.
+        :type delegate_to: string
          """
         super(DataProcPySparkOperator, self).__init__(*args, **kwargs)
         self.gcp_conn_id = gcp_conn_id
@@ -465,6 +505,7 @@ class DataProcPySparkOperator(BaseOperator):
         self.archives = archives
         self.files = files
         self.pyfiles = pyfiles
+        self.job_name = job_name
         self.dataproc_cluster = dataproc_cluster
         self.dataproc_properties = dataproc_pyspark_properties
         self.dataproc_jars = dataproc_pyspark_jars
@@ -481,5 +522,6 @@ class DataProcPySparkOperator(BaseOperator):
         job.add_archive_uris(self.archives)
         job.add_file_uris(self.files)
         job.add_python_file_uris(self.pyfiles)
+        job.set_job_name(self.job_name)
 
         hook.submit(hook.project_id, job.build())

Reply via email to