Repository: incubator-airflow
Updated Branches:
  refs/heads/master 54f1c11b6 -> 7f6d87740


[AIRFLOW-244] Modify hive operator to inject analysis data

Testing Done:
Test dags were run as backfills on an Airbnb Airflow dev box.

This PR exposes task/dag id/run data through the HiveOperator for
ingestion by performance analysis tools like Dr. Elephant.

Closes #1607 from 
paulbramsen/paulbramsen/modify_hive_operator_to_inject_analysis_data


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7f6d8774
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7f6d8774
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7f6d8774

Branch: refs/heads/master
Commit: 7f6d877409175cf5b0049406d808030d58dcdb24
Parents: 54f1c11
Author: Paul Bramsen <paul.bram...@airbnb.com>
Authored: Tue Jun 21 11:54:40 2016 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Tue Jun 21 11:54:40 2016 -0700

----------------------------------------------------------------------
 airflow/hooks/hive_hooks.py        | 18 +++++++++++++--
 airflow/operators/hive_operator.py |  4 +++-
 airflow/utils/operator_helpers.py  | 39 +++++++++++++++++++++++++++++++++
 3 files changed, 58 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7f6d8774/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 87cce6a..e24cf86 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -16,6 +16,7 @@
 from __future__ import print_function
 from builtins import zip
 from past.builtins import basestring
+
 import unicodecsv as csv
 import logging
 import re
@@ -41,6 +42,7 @@ class HiveCliHook(BaseHook):
     Note that you can also set default hive CLI parameters using the
     ``hive_cli_params`` to be used in your connection as in
     ``{"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"}``
+    Parameters passed here can be overridden by run_cli's hive_conf param
 
     The extra connection parameter ``auth`` gets passed as in the ``jdbc``
     connection string as is.
@@ -57,9 +59,17 @@ class HiveCliHook(BaseHook):
         self.conn = conn
         self.run_as = run_as
 
-    def run_cli(self, hql, schema=None, verbose=True):
+    def run_cli(self, hql, schema=None, verbose=True, hive_conf=None):
         """
-        Run an hql statement using the hive cli
+        Run an hql statement using the hive cli. If hive_conf is specified it 
should be a
+        dict and the entries will be set as key/value pairs in HiveConf
+
+
+        :param hive_conf: if specified these key value pairs will be passed to 
hive as
+            ``-hiveconf "key"="value"``. Note that they will be passed after 
the
+            ``hive_cli_params`` and thus will override whatever values are 
specified in
+            the database.
+        :type hive_conf: dict
 
         >>> hh = HiveCliHook()
         >>> result = hh.run_cli("USE airflow;")
@@ -107,6 +117,10 @@ class HiveCliHook(BaseHook):
                     if conn.password:
                         cmd_extra += ['-p', conn.password]
 
+                hive_conf = hive_conf or {}
+                for key, value in hive_conf.items():
+                    cmd_extra += ['-hiveconf', '{0}={1}'.format(key, value)]
+
                 hive_cmd = [hive_bin, '-f', fname] + cmd_extra
 
                 if self.hive_cli_params:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7f6d8774/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py 
b/airflow/operators/hive_operator.py
index 9849e9d..3763d6b 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -18,6 +18,7 @@ import re
 from airflow.hooks.hive_hooks import HiveCliHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
+from airflow.utils.operator_helpers import context_to_airflow_vars
 
 
 class HiveOperator(BaseOperator):
@@ -76,7 +77,8 @@ class HiveOperator(BaseOperator):
     def execute(self, context):
         logging.info('Executing: ' + self.hql)
         self.hook = self.get_hook()
-        self.hook.run_cli(hql=self.hql, schema=self.schema)
+        self.hook.run_cli(hql=self.hql, schema=self.schema,
+                          hive_conf=context_to_airflow_vars(context))
 
     def dry_run(self):
         self.hook = self.get_hook()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7f6d8774/airflow/utils/operator_helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/operator_helpers.py 
b/airflow/utils/operator_helpers.py
new file mode 100644
index 0000000..617976e
--- /dev/null
+++ b/airflow/utils/operator_helpers.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+def context_to_airflow_vars(context):
+    """
+    Given a context, this function provides a dictionary of values that can be 
used to
+    externally reconstruct relations between dags, dag_runs, tasks and 
task_instances.
+
+    :param context: The context for the task_instance of interest
+    :type successes: dict
+    """
+    params = dict()
+    dag = context['dag']
+    if dag and dag.dag_id:
+        params['airflow.ctx.dag.dag_id'] = dag.dag_id
+    dag_run = context['dag_run']
+    if dag_run and dag_run.execution_date:
+        params['airflow.ctx.dag_run.execution_date'] = 
dag_run.execution_date.isoformat()
+    task = context['task']
+    if task and task.task_id:
+        params['airflow.ctx.task.task_id'] = task.task_id
+    task_instance = context['task_instance']
+    if task_instance and task_instance.execution_date:
+        params['airflow.ctx.task_instance.execution_date'] = \
+            task_instance.execution_date.isoformat()
+    return params

Reply via email to