Repository: incubator-airflow Updated Branches: refs/heads/master 096ba9ecd -> 3e7e42f02
[AIRFLOW-2563] Fix PigCliHook Python 3 string/bytes use Unit tests added for PigCliHook as well to prevent future issues. Closes #3594 from jakahn/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3e7e42f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3e7e42f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3e7e42f0 Branch: refs/heads/master Commit: 3e7e42f028279a628d9e15d1ae4b6005593f8afb Parents: 096ba9e Author: Jasper Kahn <jak...@google.com> Authored: Fri Jul 27 16:08:32 2018 -0700 Committer: Arthur Wiedmer <awied...@netflix.com> Committed: Fri Jul 27 16:08:32 2018 -0700 ---------------------------------------------------------------------- airflow/hooks/pig_hook.py | 6 +- tests/hooks/test_pig_hook.py | 137 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e7e42f0/airflow/hooks/pig_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py index a3836b1..fcfcc7f 100644 --- a/airflow/hooks/pig_hook.py +++ b/airflow/hooks/pig_hook.py @@ -55,7 +55,7 @@ class PigCliHook(BaseHook): with TemporaryDirectory(prefix='airflow_pigop_') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir) as f: - f.write(pig) + f.write(pig.encode('utf-8')) f.flush() fname = f.name pig_bin = 'pig' @@ -76,8 +76,8 @@ class PigCliHook(BaseHook): close_fds=True) self.sp = sp stdout = '' - for line in iter(sp.stdout.readline, ''): - stdout += line + for line in iter(sp.stdout.readline, b''): + stdout += line.decode('utf-8') if verbose: self.log.info(line.strip()) sp.wait() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e7e42f0/tests/hooks/test_pig_hook.py ---------------------------------------------------------------------- diff --git a/tests/hooks/test_pig_hook.py b/tests/hooks/test_pig_hook.py new file mode 100644 index 0000000..c250d23 --- /dev/null +++ b/tests/hooks/test_pig_hook.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from airflow.hooks.pig_hook import PigCliHook + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class TestPigCliHook(unittest.TestCase): + + def setUp(self): + super(TestPigCliHook, self).setUp() + + self.extra_dejson = mock.MagicMock() + self.extra_dejson.get.return_value = None + self.conn = mock.MagicMock() + self.conn.extra_dejson = self.extra_dejson + conn = self.conn + + class SubPigCliHook(PigCliHook): + def get_connection(self, id): + return conn + + self.pig_hook = SubPigCliHook + + def test_init(self): + self.pig_hook() + self.extra_dejson.get.assert_called_with('pig_properties', '') + + @mock.patch('subprocess.Popen') + def test_run_cli_success(self, popen_mock): + proc_mock = mock.MagicMock() + proc_mock.returncode = 0 + proc_mock.stdout.readline.return_value = b'' + popen_mock.return_value = proc_mock + + hook = self.pig_hook() + stdout = hook.run_cli("") + + self.assertEqual(stdout, "") + + @mock.patch('subprocess.Popen') + def test_run_cli_fail(self, popen_mock): + proc_mock = mock.MagicMock() + proc_mock.returncode = 1 + proc_mock.stdout.readline.return_value = b'' + popen_mock.return_value = proc_mock + + hook = self.pig_hook() + + from airflow.exceptions import AirflowException + self.assertRaises(AirflowException, hook.run_cli, "") + + @mock.patch('subprocess.Popen') + def test_run_cli_with_properties(self, popen_mock): + test_properties = "one two" + + proc_mock = mock.MagicMock() + proc_mock.returncode = 0 + proc_mock.stdout.readline.return_value = b'' + popen_mock.return_value = proc_mock + + hook = self.pig_hook() + hook.pig_properties = test_properties + + stdout = hook.run_cli("") + self.assertEqual(stdout, "") + + popen_first_arg = popen_mock.call_args[0][0] + for pig_prop in test_properties.split(): + self.assertIn(pig_prop, popen_first_arg) + + @mock.patch('subprocess.Popen') + def test_run_cli_verbose(self, popen_mock): + test_stdout_lines = [b"one", b"two", b""] + test_stdout_strings = [s.decode('utf-8') for s in test_stdout_lines] + + proc_mock = mock.MagicMock() + proc_mock.returncode = 0 + proc_mock.stdout.readline = mock.Mock(side_effect=test_stdout_lines) + popen_mock.return_value = proc_mock + + hook = self.pig_hook() + stdout = hook.run_cli("", verbose=True) + + self.assertEqual(stdout, "".join(test_stdout_strings)) + + def test_kill_no_sp(self): + sp_mock = mock.Mock() + hook = self.pig_hook() + hook.sp = sp_mock + + hook.kill() + self.assertFalse(sp_mock.kill.called) + + def test_kill_sp_done(self): + sp_mock = mock.Mock() + sp_mock.poll.return_value = 0 + + hook = self.pig_hook() + hook.sp = sp_mock + + hook.kill() + self.assertFalse(sp_mock.kill.called) + + def test_kill(self): + sp_mock = mock.Mock() + sp_mock.poll.return_value = None + + hook = self.pig_hook() + hook.sp = sp_mock + + hook.kill() + self.assertTrue(sp_mock.kill.called)