Repository: incubator-airflow
Updated Branches:
  refs/heads/master 096ba9ecd -> 3e7e42f02


[AIRFLOW-2563] Fix PigCliHook Python 3 string/bytes use

Unit tests added for PigCliHook as well to prevent
future issues.

Closes #3594 from jakahn/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3e7e42f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3e7e42f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3e7e42f0

Branch: refs/heads/master
Commit: 3e7e42f028279a628d9e15d1ae4b6005593f8afb
Parents: 096ba9e
Author: Jasper Kahn <jak...@google.com>
Authored: Fri Jul 27 16:08:32 2018 -0700
Committer: Arthur Wiedmer <awied...@netflix.com>
Committed: Fri Jul 27 16:08:32 2018 -0700

----------------------------------------------------------------------
 airflow/hooks/pig_hook.py    |   6 +-
 tests/hooks/test_pig_hook.py | 137 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 140 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e7e42f0/airflow/hooks/pig_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py
index a3836b1..fcfcc7f 100644
--- a/airflow/hooks/pig_hook.py
+++ b/airflow/hooks/pig_hook.py
@@ -55,7 +55,7 @@ class PigCliHook(BaseHook):
 
         with TemporaryDirectory(prefix='airflow_pigop_') as tmp_dir:
             with NamedTemporaryFile(dir=tmp_dir) as f:
-                f.write(pig)
+                f.write(pig.encode('utf-8'))
                 f.flush()
                 fname = f.name
                 pig_bin = 'pig'
@@ -76,8 +76,8 @@ class PigCliHook(BaseHook):
                     close_fds=True)
                 self.sp = sp
                 stdout = ''
-                for line in iter(sp.stdout.readline, ''):
-                    stdout += line
+                for line in iter(sp.stdout.readline, b''):
+                    stdout += line.decode('utf-8')
                     if verbose:
                         self.log.info(line.strip())
                 sp.wait()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e7e42f0/tests/hooks/test_pig_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_pig_hook.py b/tests/hooks/test_pig_hook.py
new file mode 100644
index 0000000..c250d23
--- /dev/null
+++ b/tests/hooks/test_pig_hook.py
@@ -0,0 +1,137 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.hooks.pig_hook import PigCliHook
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestPigCliHook(unittest.TestCase):
+
+    def setUp(self):
+        super(TestPigCliHook, self).setUp()
+
+        self.extra_dejson = mock.MagicMock()
+        self.extra_dejson.get.return_value = None
+        self.conn = mock.MagicMock()
+        self.conn.extra_dejson = self.extra_dejson
+        conn = self.conn
+
+        class SubPigCliHook(PigCliHook):
+            def get_connection(self, id):
+                return conn
+
+        self.pig_hook = SubPigCliHook
+
+    def test_init(self):
+        self.pig_hook()
+        self.extra_dejson.get.assert_called_with('pig_properties', '')
+
+    @mock.patch('subprocess.Popen')
+    def test_run_cli_success(self, popen_mock):
+        proc_mock = mock.MagicMock()
+        proc_mock.returncode = 0
+        proc_mock.stdout.readline.return_value = b''
+        popen_mock.return_value = proc_mock
+
+        hook = self.pig_hook()
+        stdout = hook.run_cli("")
+
+        self.assertEqual(stdout, "")
+
+    @mock.patch('subprocess.Popen')
+    def test_run_cli_fail(self, popen_mock):
+        proc_mock = mock.MagicMock()
+        proc_mock.returncode = 1
+        proc_mock.stdout.readline.return_value = b''
+        popen_mock.return_value = proc_mock
+
+        hook = self.pig_hook()
+
+        from airflow.exceptions import AirflowException
+        self.assertRaises(AirflowException, hook.run_cli, "")
+
+    @mock.patch('subprocess.Popen')
+    def test_run_cli_with_properties(self, popen_mock):
+        test_properties = "one two"
+
+        proc_mock = mock.MagicMock()
+        proc_mock.returncode = 0
+        proc_mock.stdout.readline.return_value = b''
+        popen_mock.return_value = proc_mock
+
+        hook = self.pig_hook()
+        hook.pig_properties = test_properties
+
+        stdout = hook.run_cli("")
+        self.assertEqual(stdout, "")
+
+        popen_first_arg = popen_mock.call_args[0][0]
+        for pig_prop in test_properties.split():
+            self.assertIn(pig_prop, popen_first_arg)
+
+    @mock.patch('subprocess.Popen')
+    def test_run_cli_verbose(self, popen_mock):
+        test_stdout_lines = [b"one", b"two", b""]
+        test_stdout_strings = [s.decode('utf-8') for s in test_stdout_lines]
+
+        proc_mock = mock.MagicMock()
+        proc_mock.returncode = 0
+        proc_mock.stdout.readline = mock.Mock(side_effect=test_stdout_lines)
+        popen_mock.return_value = proc_mock
+
+        hook = self.pig_hook()
+        stdout = hook.run_cli("", verbose=True)
+
+        self.assertEqual(stdout, "".join(test_stdout_strings))
+
+    def test_kill_no_sp(self):
+        sp_mock = mock.Mock()
+        hook = self.pig_hook()
+        hook.sp = sp_mock
+
+        hook.kill()
+        self.assertFalse(sp_mock.kill.called)
+
+    def test_kill_sp_done(self):
+        sp_mock = mock.Mock()
+        sp_mock.poll.return_value = 0
+
+        hook = self.pig_hook()
+        hook.sp = sp_mock
+
+        hook.kill()
+        self.assertFalse(sp_mock.kill.called)
+
+    def test_kill(self):
+        sp_mock = mock.Mock()
+        sp_mock.poll.return_value = None
+
+        hook = self.pig_hook()
+        hook.sp = sp_mock
+
+        hook.kill()
+        self.assertTrue(sp_mock.kill.called)

Reply via email to