Repository: incubator-airflow Updated Branches: refs/heads/master 518e0073a -> 3b84bcb3e
[AIRFLOW-280] clean up tmp druid table no matter if an ingestion job succeeds or not Closes #1624 from hongbozeng/cleanup_druid clean up tmp druid table no matter if the ingestion job success or not Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3b84bcb3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3b84bcb3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3b84bcb3 Branch: refs/heads/master Commit: 3b84bcb3eef629da69d10db8a99418afd0386193 Parents: 518e007 Author: Hongbo Zeng <hongbo.z...@airbnb.com> Authored: Sat Jun 25 10:35:00 2016 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Sat Jun 25 10:35:00 2016 -0700 ---------------------------------------------------------------------- airflow/operators/hive_to_druid.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b84bcb3/airflow/operators/hive_to_druid.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py index 6d73e17..5ed5145 100644 --- a/airflow/operators/hive_to_druid.py +++ b/airflow/operators/hive_to_druid.py @@ -116,16 +116,17 @@ class HiveToDruidTransfer(BaseOperator): logging.info("Inserting rows into Druid") logging.info("HDFS path: " + static_path) - druid.load_from_hdfs( - datasource=self.druid_datasource, - intervals=self.intervals, - static_path=static_path, ts_dim=self.ts_dim, - columns=columns, num_shards=self.num_shards, target_partition_size=self.target_partition_size, - metric_spec=self.metric_spec, hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) - logging.info("Load seems to have succeeded!") - - logging.info( - "Cleaning up by dropping the temp " - "Hive table {}".format(hive_table)) - hql = "DROP TABLE IF EXISTS {}".format(hive_table) - hive.run_cli(hql) + try: + druid.load_from_hdfs( + datasource=self.druid_datasource, + intervals=self.intervals, + static_path=static_path, ts_dim=self.ts_dim, + columns=columns, num_shards=self.num_shards, target_partition_size=self.target_partition_size, + metric_spec=self.metric_spec, hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) + logging.info("Load seems to have succeeded!") + finally: + logging.info( + "Cleaning up by dropping the temp " + "Hive table {}".format(hive_table)) + hql = "DROP TABLE IF EXISTS {}".format(hive_table) + hive.run_cli(hql)