This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fe1eace  [BEAM-5315] Python 3 port bigquery and pubsub related io 
modules
     new a5ed104  Merge pull request #7685 from Juta/io-gcp
fe1eace is described below

commit fe1eace21fd75e83baa6012d072a19a9c1742bb1
Author: Juta <juta_st...@hotmail.com>
AuthorDate: Thu Jan 31 12:12:46 2019 +0100

    [BEAM-5315] Python 3 port bigquery and pubsub related io modules
---
 sdks/python/apache_beam/io/gcp/pubsub.py           |  3 +-
 sdks/python/apache_beam/io/gcp/pubsub_test.py      | 34 +++++++++++-----------
 .../io/gcp/tests/pubsub_matcher_test.py            | 12 ++++----
 sdks/python/tox.ini                                |  2 +-
 4 files changed, 26 insertions(+), 25 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index 53da3b4..0711b70 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -27,6 +27,7 @@ from __future__ import absolute_import
 import re
 from builtins import object
 
+from future.utils import iteritems
 from past.builtins import unicode
 
 from apache_beam import coders
@@ -114,7 +115,7 @@ class PubsubMessage(object):
     """
     msg = pubsub.types.pubsub_pb2.PubsubMessage()
     msg.data = self.data
-    for key, value in self.attributes.iteritems():
+    for key, value in iteritems(self.attributes):
       msg.attributes[key] = value
     return msg.SerializeToString()
 
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 04f8802..612ded6 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -74,7 +74,7 @@ class TestPubsubMessage(unittest.TestCase):
 
   @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
   def test_proto_conversion(self):
-    data = 'data'
+    data = b'data'
     attributes = {'k1': 'v1', 'k2': 'v2'}
     m = PubsubMessage(data, attributes)
     m_converted = PubsubMessage._from_proto_str(m._to_proto_str())
@@ -82,25 +82,25 @@ class TestPubsubMessage(unittest.TestCase):
     self.assertEqual(m_converted.attributes, attributes)
 
   def test_eq(self):
-    a = PubsubMessage('abc', {1: 2, 3: 4})
-    b = PubsubMessage('abc', {1: 2, 3: 4})
-    c = PubsubMessage('abc', {1: 2})
+    a = PubsubMessage(b'abc', {1: 2, 3: 4})
+    b = PubsubMessage(b'abc', {1: 2, 3: 4})
+    c = PubsubMessage(b'abc', {1: 2})
     self.assertTrue(a == b)
     self.assertTrue(a != c)
     self.assertTrue(b != c)
 
   def test_hash(self):
-    a = PubsubMessage('abc', {1: 2, 3: 4})
-    b = PubsubMessage('abc', {1: 2, 3: 4})
-    c = PubsubMessage('abc', {1: 2})
+    a = PubsubMessage(b'abc', {1: 2, 3: 4})
+    b = PubsubMessage(b'abc', {1: 2, 3: 4})
+    c = PubsubMessage(b'abc', {1: 2})
     self.assertTrue(hash(a) == hash(b))
     self.assertTrue(hash(a) != hash(c))
     self.assertTrue(hash(b) != hash(c))
 
   def test_repr(self):
-    a = PubsubMessage('abc', {1: 2, 3: 4})
-    b = PubsubMessage('abc', {1: 2, 3: 4})
-    c = PubsubMessage('abc', {1: 2})
+    a = PubsubMessage(b'abc', {1: 2, 3: 4})
+    b = PubsubMessage(b'abc', {1: 2, 3: 4})
+    c = PubsubMessage(b'abc', {1: 2})
     self.assertTrue(repr(a) == repr(b))
     self.assertTrue(repr(a) != repr(c))
     self.assertTrue(repr(b) != repr(c))
@@ -333,7 +333,7 @@ 
transform_evaluator.TransformEvaluatorRegistry._test_evaluators_overrides = {
 class TestReadFromPubSub(unittest.TestCase):
 
   def test_read_messages_success(self, mock_pubsub):
-    data = 'data'
+    data = b'data'
     publish_time_secs = 1520861821
     publish_time_nanos = 234567000
     attributes = {'key': 'value'}
@@ -399,7 +399,7 @@ class TestReadFromPubSub(unittest.TestCase):
         mock.call(mock.ANY, [ack_id])])
 
   def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
-    data = 'data'
+    data = b'data'
     attributes = {'time': '1337'}
     publish_time_secs = 1520861821
     publish_time_nanos = 234567000
@@ -429,7 +429,7 @@ class TestReadFromPubSub(unittest.TestCase):
         mock.call(mock.ANY, [ack_id])])
 
   def test_read_messages_timestamp_attribute_rfc3339_success(self, 
mock_pubsub):
-    data = 'data'
+    data = b'data'
     attributes = {'time': '2018-03-12T13:37:01.234567Z'}
     publish_time_secs = 1337000000
     publish_time_nanos = 133700000
@@ -459,7 +459,7 @@ class TestReadFromPubSub(unittest.TestCase):
         mock.call(mock.ANY, [ack_id])])
 
   def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
-    data = 'data'
+    data = b'data'
     attributes = {}
     publish_time_secs = 1520861821
     publish_time_nanos = 234567000
@@ -490,7 +490,7 @@ class TestReadFromPubSub(unittest.TestCase):
         mock.call(mock.ANY, [ack_id])])
 
   def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
-    data = 'data'
+    data = b'data'
     attributes = {'time': '1337 unparseable'}
     publish_time_secs = 1520861821
     publish_time_nanos = 234567000
@@ -557,7 +557,7 @@ class TestWriteToPubSub(unittest.TestCase):
         mock.call(mock.ANY, data)])
 
   def test_write_messages_with_attributes_success(self, mock_pubsub):
-    data = 'data'
+    data = b'data'
     attributes = {'key': 'value'}
     payloads = [PubsubMessage(data, attributes)]
 
@@ -589,7 +589,7 @@ class TestWriteToPubSub(unittest.TestCase):
       p.run()
 
   def test_write_messages_unsupported_features(self, mock_pubsub):
-    data = 'data'
+    data = b'data'
     attributes = {'key': 'value'}
     payloads = [PubsubMessage(data, attributes)]
 
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py 
b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
index 1261aa1..bfed329 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
@@ -59,7 +59,7 @@ class PubSubMatcherTest(unittest.TestCase):
 
   def test_message_matcher_success(self, mock_get_sub, unsued_mock):
     self.init_matcher()
-    self.pubsub_matcher.expected_msg = ['a', 'b']
+    self.pubsub_matcher.expected_msg = [b'a', b'b']
     mock_sub = mock_get_sub.return_value
     mock_sub.pull.side_effect = [
         create_pull_response([PullResponseMessage(b'a', {})]),
@@ -71,7 +71,7 @@ class PubSubMatcherTest(unittest.TestCase):
 
   def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True)
-    self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
+    self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
     mock_sub.pull.side_effect = [
         create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
@@ -82,7 +82,7 @@ class PubSubMatcherTest(unittest.TestCase):
 
   def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True)
-    self.pubsub_matcher.expected_msg = [PubsubMessage('a', {})]
+    self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {})]
     mock_sub = mock_get_sub.return_value
     # Unexpected attribute 'k'.
     mock_sub.pull.side_effect = [
@@ -96,7 +96,7 @@ class PubSubMatcherTest(unittest.TestCase):
   def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True,
                       strip_attributes=['id', 'timestamp'])
-    self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
+    self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
     mock_sub.pull.side_effect = [create_pull_response([
         PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'})
@@ -108,7 +108,7 @@ class PubSubMatcherTest(unittest.TestCase):
   def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True,
                       strip_attributes=['id', 'timestamp'])
-    self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
+    self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
     # Message is missing attribute 'timestamp'.
     mock_sub.pull.side_effect = [create_pull_response([
@@ -130,7 +130,7 @@ class PubSubMatcherTest(unittest.TestCase):
     with self.assertRaises(AssertionError) as error:
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
-    self.assertCountEqual(['c', 'd'], self.pubsub_matcher.messages)
+    self.assertCountEqual([b'c', b'd'], self.pubsub_matcher.messages)
     self.assertTrue(
         '\nExpected: Expected 1 messages.\n     but: Got 2 messages.'
         in str(error.exception.args[0]))
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index dbccd56..fc3bad2 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -99,7 +99,7 @@ setenv =
   RUN_SKIPPED_PY3_TESTS=0
 extras = test,gcp
 modules =
-  
apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.pubsub_integration_test,apache_beam.i
 [...]
+  
apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.pubsub_integration_test,apache_beam.i
 [...]
 commands =
   python --version
   pip --version

Reply via email to