[AIRFLOW-762] Add Google DataProc delete operator

Pair the recently added Create operator with a
Delete operator for
Google Cloud DataProc clusters.

Closes #1997 from alexvanboxel/pr/dataproc

Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a8b2f7f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a8b2f7f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a8b2f7f2

Branch: refs/heads/v1-8-test
Commit: a8b2f7f26fb8ca16b8ecdfb40ec2e85210fdf75e
Parents: 89f0ca4
Author: Alex Van Boxel <a...@vanboxel.be>
Authored: Mon Jan 16 17:38:28 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Jan 16 17:38:28 2017 +0100

 airflow/contrib/operators/dataproc_operator.py | 72 +++++++++++++++++++++
 1 file changed, 72 insertions(+)

diff --git a/airflow/contrib/operators/dataproc_operator.py 
index 9cf2bbe..24fa2e4 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -270,6 +270,78 @@ class DataprocClusterCreateOperator(BaseOperator):
+class DataprocClusterDeleteOperator(BaseOperator):
+    """
+    Delete a cluster on Google Cloud Dataproc. The operator will wait until the
+    cluster is destroyed.
+    """
+    template_fields = ['cluster_name']
+    @apply_defaults
+    def __init__(self,
+                 cluster_name,
+                 project_id,
+                 region='global',
+                 google_cloud_conn_id='google_cloud_default',
+                 delegate_to=None,
+                 *args,
+                 **kwargs):
+        """
+        Delete a cluster on Google Cloud Dataproc.
+        :param cluster_name: The name of the cluster to create
+        :type cluster_name: string
+        :param project_id: The ID of the google cloud project in which
+            the cluster runs
+        :type project_id: string
+        :param region: leave as 'global', might become relevant in the future
+        :type region: string
+        :param google_cloud_conn_id: The connection id to use when connecting 
to dataproc
+        :type google_cloud_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have 
+            delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataprocClusterDeleteOperator, self).__init__(*args, **kwargs)
+        self.google_cloud_conn_id = google_cloud_conn_id
+        self.delegate_to = delegate_to
+        self.cluster_name = cluster_name
+        self.project_id = project_id
+        self.region = region
+    def _wait_for_done(self, service, operation_name):
+        time.sleep(15)
+        while True:
+            response = service.projects().regions().operations().get(
+                name=operation_name
+            ).execute()
+            if 'done' in response and response['done']:
+                if 'error' in response:
+                    raise Exception(str(response['error']))
+                else:
+                    return
+            time.sleep(15)
+    def execute(self, context):
+        hook = DataProcHook(
+            gcp_conn_id=self.google_cloud_conn_id,
+            delegate_to=self.delegate_to
+        )
+        service = hook.get_conn()
+        response = service.projects().regions().clusters().delete(
+            projectId=self.project_id,
+            region=self.region,
+            clusterName=self.cluster_name
+        ).execute()
+        operation_name = response['name']
+        logging.info("Cluster delete operation name: 
+        self._wait_for_done(service, operation_name)
 class DataProcPigOperator(BaseOperator):
     Start a Pig query Job on a Cloud DataProc cluster. The parameters of the 

Reply via email to