Repository: incubator-airflow Updated Branches: refs/heads/master 9218a2167 -> 6632b0ce1
[AIRFLOW-1309] Allow hive_to_druid to take tblproperties Dear Airflow maintainers, Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below! ### JIRA - [ ] My PR addresses the following [Airflow JIRA] (https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "[AIRFLOW-XXX] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-1309 ### Description - [ ] Here are some details about my PR, including screenshots of any UI changes: Add optional tblproperties for the druid hook ### Tests - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: Will add ### Commits - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git- commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" Closes #2368 from saguziel/aguziel-update-hive-to- druid Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6632b0ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6632b0ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6632b0ce Branch: refs/heads/master Commit: 6632b0ce1cf01c82cb81e8a73af70212cf65ddd0 Parents: 9218a21 Author: Alex Guziel <alex.guz...@airbnb.com> Authored: Thu Sep 14 15:40:13 2017 -0700 Committer: Alex Guziel <alex.guz...@airbnb.com> Committed: Thu Sep 14 15:40:13 2017 -0700 ---------------------------------------------------------------------- airflow/operators/hive_to_druid.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6632b0ce/airflow/operators/hive_to_druid.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py index 7ac0b02..d7b1b82 100644 --- a/airflow/operators/hive_to_druid.py +++ b/airflow/operators/hive_to_druid.py @@ -46,6 +46,9 @@ class HiveToDruidTransfer(BaseOperator): :param intervals: list of time intervals that defines segments, this is passed as is to the json object :type intervals: list + :param hive_tblproperties: additional properties for tblproperties in + hive for the staging table + :type hive_tblproperties: dict """ template_fields = ('sql', 'intervals') @@ -67,6 +70,7 @@ class HiveToDruidTransfer(BaseOperator): target_partition_size=-1, query_granularity="NONE", segment_granularity="DAY", + hive_tblproperties=None, *args, **kwargs): super(HiveToDruidTransfer, self).__init__(*args, **kwargs) self.sql = sql @@ -84,12 +88,14 @@ class HiveToDruidTransfer(BaseOperator): self.hadoop_dependency_coordinates = hadoop_dependency_coordinates self.druid_ingest_conn_id = druid_ingest_conn_id self.metastore_conn_id = metastore_conn_id + self.hive_tblproperties = hive_tblproperties def execute(self, context): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) self.logger.info("Extracting data from Hive") hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_') sql = self.sql.strip().strip(';') + tblproperties = ''.join([", '{}' = '{}'".format(k, v) for k, v in self.hive_tblproperties.items()]) hql = """\ SET mapred.output.compress=false; SET hive.exec.compress.output=false; @@ -97,7 +103,7 @@ class HiveToDruidTransfer(BaseOperator): CREATE TABLE {hive_table} ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE - TBLPROPERTIES ('serialization.null.format' = '') + TBLPROPERTIES ('serialization.null.format' = ''{tblproperties}) AS {sql} """.format(**locals())