Repository: incubator-airflow Updated Branches: refs/heads/master 54f1c11b6 -> 7f6d87740
[AIRFLOW-244] Modify hive operator to inject analysis data Testing Done: Test dags were run as backfills on an Airbnb Airflow dev box. This PR exposes task/dag id/run data through the HiveOperator for ingestion by performance analysis tools like Dr. Elephant. Closes #1607 from paulbramsen/paulbramsen/modify_hive_operator_to_inject_analysis_data Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7f6d8774 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7f6d8774 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7f6d8774 Branch: refs/heads/master Commit: 7f6d877409175cf5b0049406d808030d58dcdb24 Parents: 54f1c11 Author: Paul Bramsen <paul.bram...@airbnb.com> Authored: Tue Jun 21 11:54:40 2016 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Jun 21 11:54:40 2016 -0700 ---------------------------------------------------------------------- airflow/hooks/hive_hooks.py | 18 +++++++++++++-- airflow/operators/hive_operator.py | 4 +++- airflow/utils/operator_helpers.py | 39 +++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7f6d8774/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 87cce6a..e24cf86 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -16,6 +16,7 @@ from __future__ import print_function from builtins import zip from past.builtins import basestring + import unicodecsv as csv import logging import re @@ -41,6 +42,7 @@ class HiveCliHook(BaseHook): Note that you can also set default hive CLI parameters using the ``hive_cli_params`` to be used in your connection as in ``{"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"}`` + Parameters passed here can be overridden by run_cli's hive_conf param The extra connection parameter ``auth`` gets passed as in the ``jdbc`` connection string as is. @@ -57,9 +59,17 @@ class HiveCliHook(BaseHook): self.conn = conn self.run_as = run_as - def run_cli(self, hql, schema=None, verbose=True): + def run_cli(self, hql, schema=None, verbose=True, hive_conf=None): """ - Run an hql statement using the hive cli + Run an hql statement using the hive cli. If hive_conf is specified it should be a + dict and the entries will be set as key/value pairs in HiveConf + + + :param hive_conf: if specified these key value pairs will be passed to hive as + ``-hiveconf "key"="value"``. Note that they will be passed after the + ``hive_cli_params`` and thus will override whatever values are specified in + the database. + :type hive_conf: dict >>> hh = HiveCliHook() >>> result = hh.run_cli("USE airflow;") @@ -107,6 +117,10 @@ class HiveCliHook(BaseHook): if conn.password: cmd_extra += ['-p', conn.password] + hive_conf = hive_conf or {} + for key, value in hive_conf.items(): + cmd_extra += ['-hiveconf', '{0}={1}'.format(key, value)] + hive_cmd = [hive_bin, '-f', fname] + cmd_extra if self.hive_cli_params: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7f6d8774/airflow/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index 9849e9d..3763d6b 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -18,6 +18,7 @@ import re from airflow.hooks.hive_hooks import HiveCliHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults +from airflow.utils.operator_helpers import context_to_airflow_vars class HiveOperator(BaseOperator): @@ -76,7 +77,8 @@ class HiveOperator(BaseOperator): def execute(self, context): logging.info('Executing: ' + self.hql) self.hook = self.get_hook() - self.hook.run_cli(hql=self.hql, schema=self.schema) + self.hook.run_cli(hql=self.hql, schema=self.schema, + hive_conf=context_to_airflow_vars(context)) def dry_run(self): self.hook = self.get_hook() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7f6d8774/airflow/utils/operator_helpers.py ---------------------------------------------------------------------- diff --git a/airflow/utils/operator_helpers.py b/airflow/utils/operator_helpers.py new file mode 100644 index 0000000..617976e --- /dev/null +++ b/airflow/utils/operator_helpers.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +def context_to_airflow_vars(context): + """ + Given a context, this function provides a dictionary of values that can be used to + externally reconstruct relations between dags, dag_runs, tasks and task_instances. + + :param context: The context for the task_instance of interest + :type successes: dict + """ + params = dict() + dag = context['dag'] + if dag and dag.dag_id: + params['airflow.ctx.dag.dag_id'] = dag.dag_id + dag_run = context['dag_run'] + if dag_run and dag_run.execution_date: + params['airflow.ctx.dag_run.execution_date'] = dag_run.execution_date.isoformat() + task = context['task'] + if task and task.task_id: + params['airflow.ctx.task.task_id'] = task.task_id + task_instance = context['task_instance'] + if task_instance and task_instance.execution_date: + params['airflow.ctx.task_instance.execution_date'] = \ + task_instance.execution_date.isoformat() + return params