[AIRFLOW-762] Add Google DataProc delete operator Pair the recently added Create operator with a Delete operator for Google Cloud DataProc clusters.
Closes #1997 from alexvanboxel/pr/dataproc Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a8b2f7f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a8b2f7f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a8b2f7f2 Branch: refs/heads/v1-8-test Commit: a8b2f7f26fb8ca16b8ecdfb40ec2e85210fdf75e Parents: 89f0ca4 Author: Alex Van Boxel <a...@vanboxel.be> Authored: Mon Jan 16 17:38:28 2017 +0100 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Mon Jan 16 17:38:28 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 72 +++++++++++++++++++++ 1 file changed, 72 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8b2f7f2/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 9cf2bbe..24fa2e4 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -270,6 +270,78 @@ class DataprocClusterCreateOperator(BaseOperator): self._wait_for_done(service) +class DataprocClusterDeleteOperator(BaseOperator): + """ + Delete a cluster on Google Cloud Dataproc. The operator will wait until the + cluster is destroyed. + """ + + template_fields = ['cluster_name'] + + @apply_defaults + def __init__(self, + cluster_name, + project_id, + region='global', + google_cloud_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + Delete a cluster on Google Cloud Dataproc. + + :param cluster_name: The name of the cluster to create + :type cluster_name: string + :param project_id: The ID of the google cloud project in which + the cluster runs + :type project_id: string + :param region: leave as 'global', might become relevant in the future + :type region: string + :param google_cloud_conn_id: The connection id to use when connecting to dataproc + :type google_cloud_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + super(DataprocClusterDeleteOperator, self).__init__(*args, **kwargs) + self.google_cloud_conn_id = google_cloud_conn_id + self.delegate_to = delegate_to + self.cluster_name = cluster_name + self.project_id = project_id + self.region = region + + def _wait_for_done(self, service, operation_name): + time.sleep(15) + while True: + response = service.projects().regions().operations().get( + name=operation_name + ).execute() + + if 'done' in response and response['done']: + if 'error' in response: + raise Exception(str(response['error'])) + else: + return + time.sleep(15) + + def execute(self, context): + hook = DataProcHook( + gcp_conn_id=self.google_cloud_conn_id, + delegate_to=self.delegate_to + ) + service = hook.get_conn() + + response = service.projects().regions().clusters().delete( + projectId=self.project_id, + region=self.region, + clusterName=self.cluster_name + ).execute() + operation_name = response['name'] + logging.info("Cluster delete operation name: {}".format(operation_name)) + self._wait_for_done(service, operation_name) + + class DataProcPigOperator(BaseOperator): """ Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation