This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new fe1eace [BEAM-5315] Python 3 port bigquery and pubsub related io modules new a5ed104 Merge pull request #7685 from Juta/io-gcp fe1eace is described below commit fe1eace21fd75e83baa6012d072a19a9c1742bb1 Author: Juta <juta_st...@hotmail.com> AuthorDate: Thu Jan 31 12:12:46 2019 +0100 [BEAM-5315] Python 3 port bigquery and pubsub related io modules --- sdks/python/apache_beam/io/gcp/pubsub.py | 3 +- sdks/python/apache_beam/io/gcp/pubsub_test.py | 34 +++++++++++----------- .../io/gcp/tests/pubsub_matcher_test.py | 12 ++++---- sdks/python/tox.ini | 2 +- 4 files changed, 26 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 53da3b4..0711b70 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -27,6 +27,7 @@ from __future__ import absolute_import import re from builtins import object +from future.utils import iteritems from past.builtins import unicode from apache_beam import coders @@ -114,7 +115,7 @@ class PubsubMessage(object): """ msg = pubsub.types.pubsub_pb2.PubsubMessage() msg.data = self.data - for key, value in self.attributes.iteritems(): + for key, value in iteritems(self.attributes): msg.attributes[key] = value return msg.SerializeToString() diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 04f8802..612ded6 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -74,7 +74,7 @@ class TestPubsubMessage(unittest.TestCase): @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') def test_proto_conversion(self): - data = 'data' + data = b'data' attributes = {'k1': 'v1', 'k2': 'v2'} m = PubsubMessage(data, attributes) m_converted = PubsubMessage._from_proto_str(m._to_proto_str()) @@ -82,25 +82,25 @@ class TestPubsubMessage(unittest.TestCase): self.assertEqual(m_converted.attributes, attributes) def test_eq(self): - a = PubsubMessage('abc', {1: 2, 3: 4}) - b = PubsubMessage('abc', {1: 2, 3: 4}) - c = PubsubMessage('abc', {1: 2}) + a = PubsubMessage(b'abc', {1: 2, 3: 4}) + b = PubsubMessage(b'abc', {1: 2, 3: 4}) + c = PubsubMessage(b'abc', {1: 2}) self.assertTrue(a == b) self.assertTrue(a != c) self.assertTrue(b != c) def test_hash(self): - a = PubsubMessage('abc', {1: 2, 3: 4}) - b = PubsubMessage('abc', {1: 2, 3: 4}) - c = PubsubMessage('abc', {1: 2}) + a = PubsubMessage(b'abc', {1: 2, 3: 4}) + b = PubsubMessage(b'abc', {1: 2, 3: 4}) + c = PubsubMessage(b'abc', {1: 2}) self.assertTrue(hash(a) == hash(b)) self.assertTrue(hash(a) != hash(c)) self.assertTrue(hash(b) != hash(c)) def test_repr(self): - a = PubsubMessage('abc', {1: 2, 3: 4}) - b = PubsubMessage('abc', {1: 2, 3: 4}) - c = PubsubMessage('abc', {1: 2}) + a = PubsubMessage(b'abc', {1: 2, 3: 4}) + b = PubsubMessage(b'abc', {1: 2, 3: 4}) + c = PubsubMessage(b'abc', {1: 2}) self.assertTrue(repr(a) == repr(b)) self.assertTrue(repr(a) != repr(c)) self.assertTrue(repr(b) != repr(c)) @@ -333,7 +333,7 @@ transform_evaluator.TransformEvaluatorRegistry._test_evaluators_overrides = { class TestReadFromPubSub(unittest.TestCase): def test_read_messages_success(self, mock_pubsub): - data = 'data' + data = b'data' publish_time_secs = 1520861821 publish_time_nanos = 234567000 attributes = {'key': 'value'} @@ -399,7 +399,7 @@ class TestReadFromPubSub(unittest.TestCase): mock.call(mock.ANY, [ack_id])]) def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub): - data = 'data' + data = b'data' attributes = {'time': '1337'} publish_time_secs = 1520861821 publish_time_nanos = 234567000 @@ -429,7 +429,7 @@ class TestReadFromPubSub(unittest.TestCase): mock.call(mock.ANY, [ack_id])]) def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub): - data = 'data' + data = b'data' attributes = {'time': '2018-03-12T13:37:01.234567Z'} publish_time_secs = 1337000000 publish_time_nanos = 133700000 @@ -459,7 +459,7 @@ class TestReadFromPubSub(unittest.TestCase): mock.call(mock.ANY, [ack_id])]) def test_read_messages_timestamp_attribute_missing(self, mock_pubsub): - data = 'data' + data = b'data' attributes = {} publish_time_secs = 1520861821 publish_time_nanos = 234567000 @@ -490,7 +490,7 @@ class TestReadFromPubSub(unittest.TestCase): mock.call(mock.ANY, [ack_id])]) def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub): - data = 'data' + data = b'data' attributes = {'time': '1337 unparseable'} publish_time_secs = 1520861821 publish_time_nanos = 234567000 @@ -557,7 +557,7 @@ class TestWriteToPubSub(unittest.TestCase): mock.call(mock.ANY, data)]) def test_write_messages_with_attributes_success(self, mock_pubsub): - data = 'data' + data = b'data' attributes = {'key': 'value'} payloads = [PubsubMessage(data, attributes)] @@ -589,7 +589,7 @@ class TestWriteToPubSub(unittest.TestCase): p.run() def test_write_messages_unsupported_features(self, mock_pubsub): - data = 'data' + data = b'data' attributes = {'key': 'value'} payloads = [PubsubMessage(data, attributes)] diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py index 1261aa1..bfed329 100644 --- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py @@ -59,7 +59,7 @@ class PubSubMatcherTest(unittest.TestCase): def test_message_matcher_success(self, mock_get_sub, unsued_mock): self.init_matcher() - self.pubsub_matcher.expected_msg = ['a', 'b'] + self.pubsub_matcher.expected_msg = [b'a', b'b'] mock_sub = mock_get_sub.return_value mock_sub.pull.side_effect = [ create_pull_response([PullResponseMessage(b'a', {})]), @@ -71,7 +71,7 @@ class PubSubMatcherTest(unittest.TestCase): def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True) - self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})] + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})] mock_sub = mock_get_sub.return_value mock_sub.pull.side_effect = [ create_pull_response([PullResponseMessage(b'a', {'k': 'v'})]) @@ -82,7 +82,7 @@ class PubSubMatcherTest(unittest.TestCase): def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True) - self.pubsub_matcher.expected_msg = [PubsubMessage('a', {})] + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {})] mock_sub = mock_get_sub.return_value # Unexpected attribute 'k'. mock_sub.pull.side_effect = [ @@ -96,7 +96,7 @@ class PubSubMatcherTest(unittest.TestCase): def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True, strip_attributes=['id', 'timestamp']) - self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})] + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})] mock_sub = mock_get_sub.return_value mock_sub.pull.side_effect = [create_pull_response([ PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'}) @@ -108,7 +108,7 @@ class PubSubMatcherTest(unittest.TestCase): def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True, strip_attributes=['id', 'timestamp']) - self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})] + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})] mock_sub = mock_get_sub.return_value # Message is missing attribute 'timestamp'. mock_sub.pull.side_effect = [create_pull_response([ @@ -130,7 +130,7 @@ class PubSubMatcherTest(unittest.TestCase): with self.assertRaises(AssertionError) as error: hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) - self.assertCountEqual(['c', 'd'], self.pubsub_matcher.messages) + self.assertCountEqual([b'c', b'd'], self.pubsub_matcher.messages) self.assertTrue( '\nExpected: Expected 1 messages.\n but: Got 2 messages.' in str(error.exception.args[0])) diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index dbccd56..fc3bad2 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -99,7 +99,7 @@ setenv = RUN_SKIPPED_PY3_TESTS=0 extras = test,gcp modules = - apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.pubsub_integration_test,apache_beam.i [...] + apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.pubsub_integration_test,apache_beam.i [...] commands = python --version pip --version