This is an automated email from the ASF dual-hosted git repository. ccy pushed a commit to branch revert-6674-rev1 in repository https://gitbox.apache.org/repos/asf/beam.git
commit cda0cabf6935c41290a1ab0265e0ef814c59df1b Author: Charles Chen <charlesccyc...@users.noreply.github.com> AuthorDate: Mon Oct 15 13:29:16 2018 -0700 Revert "[BEAM-5706] Revert 324f0b3e3c (pull request #6564 from udim/pubsub-0-35-4)" --- .../examples/complete/game/game_stats_it_test.py | 31 ++- .../examples/complete/game/leader_board_it_test.py | 32 +-- .../examples/streaming_wordcount_it_test.py | 45 ++-- sdks/python/apache_beam/io/gcp/pubsub.py | 13 +- .../apache_beam/io/gcp/pubsub_integration_test.py | 39 +-- sdks/python/apache_beam/io/gcp/pubsub_test.py | 270 ++++++--------------- .../apache_beam/io/gcp/tests/pubsub_matcher.py | 44 ++-- .../io/gcp/tests/pubsub_matcher_test.py | 75 +++--- .../apache_beam/runners/direct/direct_runner.py | 37 +-- .../runners/direct/test_direct_runner.py | 1 + .../runners/direct/transform_evaluator.py | 98 +++++--- sdks/python/apache_beam/testing/test_utils.py | 83 ++++--- sdks/python/apache_beam/testing/test_utils_test.py | 57 +---- sdks/python/container/base_image_requirements.txt | 3 +- sdks/python/setup.py | 3 +- 15 files changed, 350 insertions(+), 481 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py index 6dc60d0..2fc19da 100644 --- a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py +++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py @@ -72,15 +72,15 @@ class GameStatsIT(unittest.TestCase): # Set up PubSub environment. from google.cloud import pubsub - self.pubsub_client = pubsub.Client(project=self.project) - unique_topic_name = self.INPUT_TOPIC + _unique_id - unique_subscrition_name = self.INPUT_SUB + _unique_id - self.input_topic = self.pubsub_client.topic(unique_topic_name) - self.input_sub = self.input_topic.subscription(unique_subscrition_name) + self.pub_client = pubsub.PublisherClient() + self.input_topic = self.pub_client.create_topic( + self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id)) - self.input_topic.create() - test_utils.wait_for_topics_created([self.input_topic]) - self.input_sub.create() + self.sub_client = pubsub.SubscriberClient() + self.input_sub = self.sub_client.create_subscription( + self.sub_client.subscription_path(self.project, + self.INPUT_SUB + _unique_id), + self.input_topic.name) # Set up BigQuery environment from google.cloud import bigquery @@ -95,14 +95,15 @@ class GameStatsIT(unittest.TestCase): """Inject game events as test data to PubSub.""" logging.debug('Injecting %d game events to topic %s', - message_count, topic.full_name) + message_count, topic.name) for _ in range(message_count): - topic.publish(self.INPUT_EVENT % self._test_timestamp) + self.pub_client.publish(topic.name, + self.INPUT_EVENT % self._test_timestamp) def _cleanup_pubsub(self): - test_utils.cleanup_subscriptions([self.input_sub]) - test_utils.cleanup_topics([self.input_topic]) + test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub]) + test_utils.cleanup_topics(self.pub_client, [self.input_topic]) def _cleanup_dataset(self): self.dataset.delete() @@ -123,9 +124,9 @@ class GameStatsIT(unittest.TestCase): # TODO(mariagh): Add teams table verifier once game_stats.py is fixed. - extra_opts = {'subscription': self.input_sub.full_name, + extra_opts = {'subscription': self.input_sub.name, 'dataset': self.dataset.name, - 'topic': self.input_topic.full_name, + 'topic': self.input_topic.name, 'fixed_window_duration': 1, 'user_activity_window_duration': 1, 'wait_until_finish_duration': @@ -143,8 +144,6 @@ class GameStatsIT(unittest.TestCase): self.dataset.name, self.OUTPUT_TABLE_TEAMS) # Generate input data and inject to PubSub. - test_utils.wait_for_subscriptions_created([self.input_topic, - self.input_sub]) self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT) # Get pipeline options from command argument: --test-pipeline-options, diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py index ab10942..e0e309b 100644 --- a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py +++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py @@ -73,15 +73,16 @@ class LeaderBoardIT(unittest.TestCase): # Set up PubSub environment. from google.cloud import pubsub - self.pubsub_client = pubsub.Client(project=self.project) - unique_topic_name = self.INPUT_TOPIC + _unique_id - unique_subscrition_name = self.INPUT_SUB + _unique_id - self.input_topic = self.pubsub_client.topic(unique_topic_name) - self.input_sub = self.input_topic.subscription(unique_subscrition_name) - self.input_topic.create() - test_utils.wait_for_topics_created([self.input_topic]) - self.input_sub.create() + self.pub_client = pubsub.PublisherClient() + self.input_topic = self.pub_client.create_topic( + self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id)) + + self.sub_client = pubsub.SubscriberClient() + self.input_sub = self.sub_client.create_subscription( + self.sub_client.subscription_path(self.project, + self.INPUT_SUB + _unique_id), + self.input_topic.name) # Set up BigQuery environment from google.cloud import bigquery @@ -96,14 +97,15 @@ class LeaderBoardIT(unittest.TestCase): """Inject game events as test data to PubSub.""" logging.debug('Injecting %d game events to topic %s', - message_count, topic.full_name) + message_count, topic.name) for _ in range(message_count): - topic.publish(self.INPUT_EVENT % self._test_timestamp) + self.pub_client.publish(topic.name, + self.INPUT_EVENT % self._test_timestamp) def _cleanup_pubsub(self): - test_utils.cleanup_subscriptions([self.input_sub]) - test_utils.cleanup_topics([self.input_topic]) + test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub]) + test_utils.cleanup_topics(self.pub_client, [self.input_topic]) def _cleanup_dataset(self): self.dataset.delete() @@ -131,9 +133,9 @@ class LeaderBoardIT(unittest.TestCase): teams_query, self.DEFAULT_EXPECTED_CHECKSUM) - extra_opts = {'subscription': self.input_sub.full_name, + extra_opts = {'subscription': self.input_sub.name, 'dataset': self.dataset.name, - 'topic': self.input_topic.full_name, + 'topic': self.input_topic.name, 'team_window_duration': 1, 'wait_until_finish_duration': self.WAIT_UNTIL_FINISH_DURATION, @@ -151,8 +153,6 @@ class LeaderBoardIT(unittest.TestCase): self.dataset.name, self.OUTPUT_TABLE_TEAMS) # Generate input data and inject to PubSub. - test_utils.wait_for_subscriptions_created([self.input_topic, - self.input_sub]) self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT) # Get pipeline options from command argument: --test-pipeline-options, diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py index 3c0cfa9..78e89a1 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py @@ -52,31 +52,31 @@ class StreamingWordCountIT(unittest.TestCase): # Set up PubSub environment. from google.cloud import pubsub - self.pubsub_client = pubsub.Client(project=self.project) - self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid) - self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid) - self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid) - self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid) - - self.input_topic.create() - self.output_topic.create() - test_utils.wait_for_topics_created([self.input_topic, self.output_topic]) - self.input_sub.create() - self.output_sub.create() + self.pub_client = pubsub.PublisherClient() + self.input_topic = self.pub_client.create_topic( + self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid)) + self.output_topic = self.pub_client.create_topic( + self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid)) + + self.sub_client = pubsub.SubscriberClient() + self.input_sub = self.sub_client.create_subscription( + self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid), + self.input_topic.name) + self.output_sub = self.sub_client.create_subscription( + self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid), + self.output_topic.name) def _inject_numbers(self, topic, num_messages): """Inject numbers as test data to PubSub.""" - logging.debug('Injecting %d numbers to topic %s', - num_messages, topic.full_name) + logging.debug('Injecting %d numbers to topic %s', num_messages, topic.name) for n in range(num_messages): - topic.publish(str(n)) - - def _cleanup_pubsub(self): - test_utils.cleanup_subscriptions([self.input_sub, self.output_sub]) - test_utils.cleanup_topics([self.input_topic, self.output_topic]) + self.pub_client.publish(self.input_topic.name, str(n)) def tearDown(self): - self._cleanup_pubsub() + test_utils.cleanup_subscriptions(self.sub_client, + [self.input_sub, self.output_sub]) + test_utils.cleanup_topics(self.pub_client, + [self.input_topic, self.output_topic]) @attr('IT') def test_streaming_wordcount_it(self): @@ -86,17 +86,16 @@ class StreamingWordCountIT(unittest.TestCase): # Set extra options to the pipeline for test purpose state_verifier = PipelineStateMatcher(PipelineState.RUNNING) pubsub_msg_verifier = PubSubMessageMatcher(self.project, - OUTPUT_SUB + self.uuid, + self.output_sub.name, expected_msg, timeout=400) - extra_opts = {'input_subscription': self.input_sub.full_name, - 'output_topic': self.output_topic.full_name, + extra_opts = {'input_subscription': self.input_sub.name, + 'output_topic': self.output_topic.name, 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION, 'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)} # Generate input data and inject to PubSub. - test_utils.wait_for_subscriptions_created([self.input_sub]) self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS) # Get pipeline options from command argument: --test-pipeline-options, diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 2414194..a1644ab 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -38,11 +38,10 @@ from apache_beam.transforms import PTransform from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils.annotations import deprecated -# The protobuf library is only used for running on Dataflow. try: - from google.cloud.proto.pubsub.v1 import pubsub_pb2 + from google.cloud import pubsub except ImportError: - pubsub_pb2 = None + pubsub = None __all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub', 'WriteStringsToPubSub', 'WriteToPubSub'] @@ -92,7 +91,7 @@ class PubsubMessage(object): Returns: A new PubsubMessage object. """ - msg = pubsub_pb2.PubsubMessage() + msg = pubsub.types.pubsub_pb2.PubsubMessage() msg.ParseFromString(proto_msg) # Convert ScalarMapContainer to dict. attributes = dict((key, msg.attributes[key]) for key in msg.attributes) @@ -109,7 +108,7 @@ class PubsubMessage(object): https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage containing the payload of this object. """ - msg = pubsub_pb2.PubsubMessage() + msg = pubsub.types.pubsub_pb2.PubsubMessage() msg.data = self.data for key, value in self.attributes.iteritems(): msg.attributes[key] = value @@ -117,9 +116,9 @@ class PubsubMessage(object): @staticmethod def _from_message(msg): - """Construct from ``google.cloud.pubsub.message.Message``. + """Construct from ``google.cloud.pubsub_v1.subscriber.message.Message``. - https://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/api/message.html + https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/message.html """ # Convert ScalarMapContainer to dict. attributes = dict((key, msg.attributes[key]) for key in msg.attributes) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index 9bb81fc..5b060e5 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -100,21 +100,25 @@ class PubSubIntegrationTest(unittest.TestCase): # Set up PubSub environment. from google.cloud import pubsub - self.pubsub_client = pubsub.Client(project=self.project) - self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid) - self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid) - self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid) - self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid) - - self.input_topic.create() - self.output_topic.create() - test_utils.wait_for_topics_created([self.input_topic, self.output_topic]) - self.input_sub.create() - self.output_sub.create() + self.pub_client = pubsub.PublisherClient() + self.input_topic = self.pub_client.create_topic( + self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid)) + self.output_topic = self.pub_client.create_topic( + self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid)) + + self.sub_client = pubsub.SubscriberClient() + self.input_sub = self.sub_client.create_subscription( + self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid), + self.input_topic.name) + self.output_sub = self.sub_client.create_subscription( + self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid), + self.output_topic.name) def tearDown(self): - test_utils.cleanup_subscriptions([self.input_sub, self.output_sub]) - test_utils.cleanup_topics([self.input_topic, self.output_topic]) + test_utils.cleanup_subscriptions(self.sub_client, + [self.input_sub, self.output_sub]) + test_utils.cleanup_topics(self.pub_client, + [self.input_topic, self.output_topic]) def _test_streaming(self, with_attributes): """Runs IT pipeline with message verifier. @@ -139,21 +143,20 @@ class PubSubIntegrationTest(unittest.TestCase): strip_attributes = [self.ID_LABEL, self.TIMESTAMP_ATTRIBUTE] pubsub_msg_verifier = PubSubMessageMatcher( self.project, - OUTPUT_SUB + self.uuid, + self.output_sub.name, expected_messages, timeout=MESSAGE_MATCHER_TIMEOUT_S, with_attributes=with_attributes, strip_attributes=strip_attributes) - extra_opts = {'input_subscription': self.input_sub.full_name, - 'output_topic': self.output_topic.full_name, + extra_opts = {'input_subscription': self.input_sub.name, + 'output_topic': self.output_topic.name, 'wait_until_finish_duration': TEST_PIPELINE_DURATION_MS, 'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)} # Generate input data and inject to PubSub. - test_utils.wait_for_subscriptions_created([self.input_sub]) for msg in self.INPUT_MESSAGES[self.runner_name]: - self.input_topic.publish(msg.data, **msg.attributes) + self.pub_client.publish(self.input_topic.name, msg.data, **msg.attributes) # Get pipeline options from command argument: --test-pipeline-options, # and start pipeline job by calling pipeline main function. diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 6e19950..a95ffc6 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -20,12 +20,9 @@ from __future__ import absolute_import -import functools import logging import unittest from builtins import object -from builtins import range -from builtins import zip import hamcrest as hc import mock @@ -43,6 +40,7 @@ from apache_beam.runners.direct import transform_evaluator from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub from apache_beam.runners.direct.direct_runner import _get_transform_overrides from apache_beam.runners.direct.transform_evaluator import _PubSubReadEvaluator +from apache_beam.testing import test_utils from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import TestWindowedValue from apache_beam.testing.util import assert_that @@ -54,18 +52,10 @@ from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.utils import timestamp # Protect against environments where the PubSub library is not available. -# pylint: disable=wrong-import-order, wrong-import-position try: from google.cloud import pubsub except ImportError: pubsub = None -# pylint: enable=wrong-import-order, wrong-import-position - -# The protobuf library is only used for running on Dataflow. -try: - from google.cloud.proto.pubsub.v1 import pubsub_pb2 -except ImportError: - pubsub_pb2 = None class TestPubsubMessage(unittest.TestCase): @@ -81,8 +71,7 @@ class TestPubsubMessage(unittest.TestCase): with self.assertRaisesRegexp(ValueError, r'data.*attributes.*must be set'): _ = PubsubMessage(None, {}) - @unittest.skipIf(pubsub_pb2 is None, - 'PubSub proto dependencies are not installed') + @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') def test_proto_conversion(self): data = 'data' attributes = {'k1': 'v1', 'k2': 'v2'} @@ -220,7 +209,7 @@ class TestWriteStringsToPubSubOverride(unittest.TestCase): write_transform = pcoll.producer.inputs[0].producer.transform # Ensure that the properties passed through correctly - self.assertEqual('a_topic', write_transform.dofn.topic_name) + self.assertEqual('a_topic', write_transform.dofn.short_topic_name) def test_expand(self): p = TestPipeline() @@ -240,7 +229,7 @@ class TestWriteStringsToPubSubOverride(unittest.TestCase): write_transform = pcoll.producer.inputs[0].producer.transform # Ensure that the properties passed through correctly - self.assertEqual('a_topic', write_transform.dofn.topic_name) + self.assertEqual('a_topic', write_transform.dofn.short_topic_name) self.assertEqual(True, write_transform.dofn.with_attributes) # TODO(BEAM-4275): These properties aren't supported yet in direct runner. self.assertEqual(None, write_transform.dofn.id_label) @@ -333,118 +322,25 @@ transform_evaluator.TransformEvaluatorRegistry._test_evaluators_overrides = { } -class FakePubsubTopic(object): - - def __init__(self, name, client): - self.name = name - self.client = client - - def subscription(self, name): - return FakePubsubSubscription(name, self.name, self.client) - - def batch(self): - if self.client.batch is None: - self.client.batch = FakeBatch(self.client) - return self.client.batch - - -class FakePubsubSubscription(object): - - def __init__(self, name, topic, client): - self.name = name - self.topic = topic - self.client = client - - def create(self): - pass - - -class FakeAutoAck(object): - - def __init__(self, sub, **unused_kwargs): - self.sub = sub - - def __enter__(self): - messages = self.sub.client.messages_read - self.ack_id_to_msg = dict(zip(range(len(messages)), messages)) - return self.ack_id_to_msg - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - -class FakeBatch(object): - """Context manager that accept Pubsub client writes via publish(). - - Verifies writes on exit. - """ - - def __init__(self, client): - self.client = client - self.published = [] - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type is not None: - return # Exception will be raised. - hc.assert_that(self.published, - hc.only_contains(*self.client.messages_write)) - - def publish(self, message, **attrs): - self.published.append([message, attrs]) - - -class FakePubsubClient(object): - - def __init__(self, messages_read=None, messages_write=None, project=None, - **unused_kwargs): - """Creates a Pubsub client fake. - - Args: - messages_read: List of PubsubMessage objects to return. - messages_write: List of [data, attributes] pairs, corresponding to - messages expected to be written to the client. - project: Name of GCP project. - """ - self.messages_read = messages_read - self.messages_write = messages_write - self.project = project - self.batch = None - - def topic(self, name): - return FakePubsubTopic(name, self) - - -def create_client_message(data, message_id, attributes, publish_time): - """Returns a message as it would be returned from Cloud Pub/Sub client. - - This is what the reader sees. - """ - msg = pubsub.message.Message(data, message_id, attributes) - msg._service_timestamp = publish_time - return msg - - @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') +@mock.patch('google.cloud.pubsub.SubscriberClient') class TestReadFromPubSub(unittest.TestCase): - @mock.patch('google.cloud.pubsub') def test_read_messages_success(self, mock_pubsub): data = 'data' - message_id = 'message_id' - publish_time = '2018-03-12T13:37:01.234567Z' + publish_time_secs = 1520861821 + publish_time_nanos = 234567000 attributes = {'key': 'value'} - payloads = [create_client_message( - data, message_id, attributes, publish_time)] + ack_id = 'ack_id' + pull_response = test_utils.create_pull_response([ + test_utils.PullResponseMessage( + data, attributes, publish_time_secs, publish_time_nanos, ack_id) + ]) expected_elements = [ TestWindowedValue(PubsubMessage(data, attributes), timestamp.Timestamp(1520861821.234567), [window.GlobalWindow()])] - - mock_pubsub.Client = functools.partial(FakePubsubClient, payloads) - mock_pubsub.subscription.AutoAck = FakeAutoAck + mock_pubsub.return_value.pull.return_value = pull_response p = TestPipeline() p.options.view_as(StandardOptions).streaming = True @@ -453,17 +349,18 @@ class TestReadFromPubSub(unittest.TestCase): None, None, with_attributes=True)) assert_that(pcoll, equal_to(expected_elements), reify_windows=True) p.run() + mock_pubsub.return_value.acknowledge.assert_has_calls([ + mock.call(mock.ANY, [ack_id])]) - @mock.patch('google.cloud.pubsub') def test_read_strings_success(self, mock_pubsub): data = u'🤷 ¯\\_(ツ)_/¯' data_encoded = data.encode('utf-8') - publish_time = '2018-03-12T13:37:01.234567Z' - payloads = [create_client_message(data_encoded, None, None, publish_time)] + ack_id = 'ack_id' + pull_response = test_utils.create_pull_response([ + test_utils.PullResponseMessage(data_encoded, ack_id=ack_id) + ]) expected_elements = [data] - - mock_pubsub.Client = functools.partial(FakePubsubClient, payloads) - mock_pubsub.subscription.AutoAck = FakeAutoAck + mock_pubsub.return_value.pull.return_value = pull_response p = TestPipeline() p.options.view_as(StandardOptions).streaming = True @@ -472,16 +369,16 @@ class TestReadFromPubSub(unittest.TestCase): None, None)) assert_that(pcoll, equal_to(expected_elements)) p.run() + mock_pubsub.return_value.acknowledge.assert_has_calls([ + mock.call(mock.ANY, [ack_id])]) - @mock.patch('google.cloud.pubsub') def test_read_data_success(self, mock_pubsub): data_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8') - publish_time = '2018-03-12T13:37:01.234567Z' - payloads = [create_client_message(data_encoded, None, None, publish_time)] + ack_id = 'ack_id' + pull_response = test_utils.create_pull_response([ + test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)]) expected_elements = [data_encoded] - - mock_pubsub.Client = functools.partial(FakePubsubClient, payloads) - mock_pubsub.subscription.AutoAck = FakeAutoAck + mock_pubsub.return_value.pull.return_value = pull_response p = TestPipeline() p.options.view_as(StandardOptions).streaming = True @@ -489,24 +386,26 @@ class TestReadFromPubSub(unittest.TestCase): | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None)) assert_that(pcoll, equal_to(expected_elements)) p.run() + mock_pubsub.return_value.acknowledge.assert_has_calls([ + mock.call(mock.ANY, [ack_id])]) - @mock.patch('google.cloud.pubsub') def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub): data = 'data' - message_id = 'message_id' attributes = {'time': '1337'} - publish_time = '2018-03-12T13:37:01.234567Z' - payloads = [ - create_client_message(data, message_id, attributes, publish_time)] + publish_time_secs = 1520861821 + publish_time_nanos = 234567000 + ack_id = 'ack_id' + pull_response = test_utils.create_pull_response([ + test_utils.PullResponseMessage( + data, attributes, publish_time_secs, publish_time_nanos, ack_id) + ]) expected_elements = [ TestWindowedValue( PubsubMessage(data, attributes), timestamp.Timestamp(micros=int(attributes['time']) * 1000), [window.GlobalWindow()]), ] - - mock_pubsub.Client = functools.partial(FakePubsubClient, payloads) - mock_pubsub.subscription.AutoAck = FakeAutoAck + mock_pubsub.return_value.pull.return_value = pull_response p = TestPipeline() p.options.view_as(StandardOptions).streaming = True @@ -516,24 +415,26 @@ class TestReadFromPubSub(unittest.TestCase): with_attributes=True, timestamp_attribute='time')) assert_that(pcoll, equal_to(expected_elements), reify_windows=True) p.run() + mock_pubsub.return_value.acknowledge.assert_has_calls([ + mock.call(mock.ANY, [ack_id])]) - @mock.patch('google.cloud.pubsub') def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub): data = 'data' - message_id = 'message_id' attributes = {'time': '2018-03-12T13:37:01.234567Z'} - publish_time = '2018-03-12T13:37:01.234567Z' - payloads = [ - create_client_message(data, message_id, attributes, publish_time)] + publish_time_secs = 1337000000 + publish_time_nanos = 133700000 + ack_id = 'ack_id' + pull_response = test_utils.create_pull_response([ + test_utils.PullResponseMessage( + data, attributes, publish_time_secs, publish_time_nanos, ack_id) + ]) expected_elements = [ TestWindowedValue( PubsubMessage(data, attributes), timestamp.Timestamp.from_rfc3339(attributes['time']), [window.GlobalWindow()]), ] - - mock_pubsub.Client = functools.partial(FakePubsubClient, payloads) - mock_pubsub.subscription.AutoAck = FakeAutoAck + mock_pubsub.return_value.pull.return_value = pull_response p = TestPipeline() p.options.view_as(StandardOptions).streaming = True @@ -543,24 +444,27 @@ class TestReadFromPubSub(unittest.TestCase): with_attributes=True, timestamp_attribute='time')) assert_that(pcoll, equal_to(expected_elements), reify_windows=True) p.run() + mock_pubsub.return_value.acknowledge.assert_has_calls([ + mock.call(mock.ANY, [ack_id])]) - @mock.patch('google.cloud.pubsub') def test_read_messages_timestamp_attribute_missing(self, mock_pubsub): data = 'data' - message_id = 'message_id' attributes = {} + publish_time_secs = 1520861821 + publish_time_nanos = 234567000 publish_time = '2018-03-12T13:37:01.234567Z' - payloads = [ - create_client_message(data, message_id, attributes, publish_time)] + ack_id = 'ack_id' + pull_response = test_utils.create_pull_response([ + test_utils.PullResponseMessage( + data, attributes, publish_time_secs, publish_time_nanos, ack_id) + ]) expected_elements = [ TestWindowedValue( PubsubMessage(data, attributes), timestamp.Timestamp.from_rfc3339(publish_time), [window.GlobalWindow()]), ] - - mock_pubsub.Client = functools.partial(FakePubsubClient, payloads) - mock_pubsub.subscription.AutoAck = FakeAutoAck + mock_pubsub.return_value.pull.return_value = pull_response p = TestPipeline() p.options.view_as(StandardOptions).streaming = True @@ -570,18 +474,20 @@ class TestReadFromPubSub(unittest.TestCase): with_attributes=True, timestamp_attribute='nonexistent')) assert_that(pcoll, equal_to(expected_elements), reify_windows=True) p.run() + mock_pubsub.return_value.acknowledge.assert_has_calls([ + mock.call(mock.ANY, [ack_id])]) - @mock.patch('google.cloud.pubsub') def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub): data = 'data' - message_id = 'message_id' attributes = {'time': '1337 unparseable'} - publish_time = '2018-03-12T13:37:01.234567Z' - payloads = [ - create_client_message(data, message_id, attributes, publish_time)] - - mock_pubsub.Client = functools.partial(FakePubsubClient, payloads) - mock_pubsub.subscription.AutoAck = FakeAutoAck + publish_time_secs = 1520861821 + publish_time_nanos = 234567000 + ack_id = 'ack_id' + pull_response = test_utils.create_pull_response([ + test_utils.PullResponseMessage( + data, attributes, publish_time_secs, publish_time_nanos, ack_id) + ]) + mock_pubsub.return_value.pull.return_value = pull_response p = TestPipeline() p.options.view_as(StandardOptions).streaming = True @@ -591,20 +497,10 @@ class TestReadFromPubSub(unittest.TestCase): with_attributes=True, timestamp_attribute='time')) with self.assertRaisesRegexp(ValueError, r'parse'): p.run() + mock_pubsub.return_value.acknowledge.assert_not_called() - @mock.patch('google.cloud.pubsub') - def test_read_message_id_label_unsupported(self, mock_pubsub): + def test_read_message_id_label_unsupported(self, unused_mock_pubsub): # id_label is unsupported in DirectRunner. - data = 'data' - message_id = 'message_id' - attributes = {'time': '1337 unparseable'} - publish_time = '2018-03-12T13:37:01.234567Z' - payloads = [ - create_client_message(data, message_id, attributes, publish_time)] - - mock_pubsub.Client = functools.partial(FakePubsubClient, payloads) - mock_pubsub.subscription.AutoAck = FakeAutoAck - p = TestPipeline() p.options.view_as(StandardOptions).streaming = True _ = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, 'a_label')) @@ -614,16 +510,12 @@ class TestReadFromPubSub(unittest.TestCase): @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') +@mock.patch('google.cloud.pubsub.PublisherClient') class TestWriteToPubSub(unittest.TestCase): - @mock.patch('google.cloud.pubsub') def test_write_messages_success(self, mock_pubsub): data = 'data' payloads = [data] - expected_payloads = [[data, {}]] - - mock_pubsub.Client = functools.partial(FakePubsubClient, - messages_write=expected_payloads) p = TestPipeline() p.options.view_as(StandardOptions).streaming = True @@ -632,15 +524,12 @@ class TestWriteToPubSub(unittest.TestCase): | WriteToPubSub('projects/fakeprj/topics/a_topic', with_attributes=False)) p.run() + mock_pubsub.return_value.publish.assert_has_calls([ + mock.call(mock.ANY, data)]) - @mock.patch('google.cloud.pubsub') def test_write_messages_deprecated(self, mock_pubsub): data = 'data' payloads = [data] - expected_payloads = [[data, {}]] - - mock_pubsub.Client = functools.partial(FakePubsubClient, - messages_write=expected_payloads) p = TestPipeline() p.options.view_as(StandardOptions).streaming = True @@ -648,16 +537,13 @@ class TestWriteToPubSub(unittest.TestCase): | Create(payloads) | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')) p.run() + mock_pubsub.return_value.publish.assert_has_calls([ + mock.call(mock.ANY, data)]) - @mock.patch('google.cloud.pubsub') def test_write_messages_with_attributes_success(self, mock_pubsub): data = 'data' attributes = {'key': 'value'} payloads = [PubsubMessage(data, attributes)] - expected_payloads = [[data, attributes]] - - mock_pubsub.Client = functools.partial(FakePubsubClient, - messages_write=expected_payloads) p = TestPipeline() p.options.view_as(StandardOptions).streaming = True @@ -666,15 +552,14 @@ class TestWriteToPubSub(unittest.TestCase): | WriteToPubSub('projects/fakeprj/topics/a_topic', with_attributes=True)) p.run() + mock_pubsub.return_value.publish.assert_has_calls([ + mock.call(mock.ANY, data, **attributes)]) - @mock.patch('google.cloud.pubsub') def test_write_messages_with_attributes_error(self, mock_pubsub): data = 'data' # Sending raw data when WriteToPubSub expects a PubsubMessage object. payloads = [data] - mock_pubsub.Client = functools.partial(FakePubsubClient) - p = TestPipeline() p.options.view_as(StandardOptions).streaming = True _ = (p @@ -685,15 +570,10 @@ class TestWriteToPubSub(unittest.TestCase): r'str.*has no attribute.*data'): p.run() - @mock.patch('google.cloud.pubsub') def test_write_messages_unsupported_features(self, mock_pubsub): data = 'data' attributes = {'key': 'value'} payloads = [PubsubMessage(data, attributes)] - expected_payloads = [[data, attributes]] - - mock_pubsub.Client = functools.partial(FakePubsubClient, - messages_write=expected_payloads) p = TestPipeline() p.options.view_as(StandardOptions).streaming = True diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py index 6217faf..7a0b5c8 100644 --- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py +++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py @@ -31,12 +31,10 @@ __all__ = ['PubSubMessageMatcher'] # Protect against environments where pubsub library is not available. -# pylint: disable=wrong-import-order, wrong-import-position try: from google.cloud import pubsub except ImportError: pubsub = None -# pylint: enable=wrong-import-order, wrong-import-position DEFAULT_TIMEOUT = 5 * 60 MAX_MESSAGES_IN_ONE_PULL = 50 @@ -49,8 +47,9 @@ class PubSubMessageMatcher(BaseMatcher): subscription until all expected messages are shown or timeout. """ - def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT, - with_attributes=False, strip_attributes=None): + def __init__(self, project, sub_name, expected_msg, + timeout=DEFAULT_TIMEOUT, with_attributes=False, + strip_attributes=None): """Initialize PubSubMessageMatcher object. Args: @@ -59,8 +58,9 @@ class PubSubMessageMatcher(BaseMatcher): expected_msg: A string list that contains expected message data pulled from the subscription. See also: with_attributes. timeout: Timeout in seconds to wait for all expected messages appears. - with_attributes: Whether expected_msg is a list of - ``PubsubMessage`` objects. + with_attributes: If True, will match against both message data and + attributes. If True, expected_msg should be a list of ``PubsubMessage`` + objects. Otherwise, it should be a list of ``bytes``. strip_attributes: List of strings. If with_attributes==True, strip the attributes keyed by these values from incoming messages. If a key is missing, will add an attribute with an error message as @@ -86,28 +86,26 @@ class PubSubMessageMatcher(BaseMatcher): def _matches(self, _): if self.messages is None: - self.messages = self._wait_for_messages(self._get_subscription(), - len(self.expected_msg), + self.messages = self._wait_for_messages(len(self.expected_msg), self.timeout) return Counter(self.messages) == Counter(self.expected_msg) - def _get_subscription(self): - return pubsub.Client(project=self.project).subscription(self.sub_name) - - def _wait_for_messages(self, subscription, expected_num, timeout): + def _wait_for_messages(self, expected_num, timeout): """Wait for messages from given subscription.""" - logging.debug('Start pulling messages from %s', subscription.full_name) total_messages = [] + + sub_client = pubsub.SubscriberClient() start_time = time.time() while time.time() - start_time <= timeout: - pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL) - for ack_id, message in pulled: - subscription.acknowledge([ack_id]) + response = sub_client.pull(self.sub_name, + max_messages=MAX_MESSAGES_IN_ONE_PULL, + return_immediately=True) + for rm in response.received_messages: + msg = PubsubMessage._from_message(rm.message) if not self.with_attributes: - total_messages.append(message.data) + total_messages.append(msg.data) continue - msg = PubsubMessage._from_message(message) if self.strip_attributes: for attr in self.strip_attributes: try: @@ -117,12 +115,16 @@ class PubSubMessageMatcher(BaseMatcher): 'expected attribute not found.') total_messages.append(msg) + ack_ids = [rm.ack_id for rm in response.received_messages] + if ack_ids: + sub_client.acknowledge(self.sub_name, ack_ids) if len(total_messages) >= expected_num: - return total_messages + break time.sleep(1) - logging.error('Timeout after %d sec. Received %d messages from %s.', - timeout, len(total_messages), subscription.full_name) + if time.time() - start_time > timeout: + logging.error('Timeout after %d sec. Received %d messages from %s.', + timeout, len(total_messages), self.sub_name) return total_messages def describe_to(self, description): diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py index 9047763..1261aa1 100644 --- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py @@ -28,17 +28,19 @@ from hamcrest import assert_that as hc_assert_that from apache_beam.io.gcp.pubsub import PubsubMessage from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher +from apache_beam.testing.test_utils import PullResponseMessage +from apache_beam.testing.test_utils import create_pull_response # Protect against environments where pubsub library is not available. -# pylint: disable=wrong-import-order, wrong-import-position try: from google.cloud import pubsub except ImportError: pubsub = None -# pylint: enable=wrong-import-order, wrong-import-position @unittest.skipIf(pubsub is None, 'PubSub dependencies are not installed.') +@mock.patch('time.sleep', return_value=None) +@mock.patch('google.cloud.pubsub.SubscriberClient') class PubSubMatcherTest(unittest.TestCase): @classmethod @@ -55,90 +57,75 @@ class PubSubMatcherTest(unittest.TestCase): 'mock_project', 'mock_sub_name', ['mock_expected_msg'], with_attributes=with_attributes, strip_attributes=strip_attributes) - @mock.patch('time.sleep', return_value=None) - @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.' - 'PubSubMessageMatcher._get_subscription') def test_message_matcher_success(self, mock_get_sub, unsued_mock): self.init_matcher() self.pubsub_matcher.expected_msg = ['a', 'b'] mock_sub = mock_get_sub.return_value mock_sub.pull.side_effect = [ - [(1, pubsub.message.Message(b'a', 'unused_id'))], - [(2, pubsub.message.Message(b'b', 'unused_id'))], + create_pull_response([PullResponseMessage(b'a', {})]), + create_pull_response([PullResponseMessage(b'b', {})]), ] hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 2) + self.assertEqual(mock_sub.acknowledge.call_count, 2) - @mock.patch('time.sleep', return_value=None) - @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.' - 'PubSubMessageMatcher._get_subscription') def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True) self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})] mock_sub = mock_get_sub.return_value - msg_a = pubsub.message.Message(b'a', 'unused_id') - msg_a.attributes['k'] = 'v' - mock_sub.pull.side_effect = [[(1, msg_a)]] + mock_sub.pull.side_effect = [ + create_pull_response([PullResponseMessage(b'a', {'k': 'v'})]) + ] hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) + self.assertEqual(mock_sub.acknowledge.call_count, 1) - @mock.patch('time.sleep', return_value=None) - @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.' - 'PubSubMessageMatcher._get_subscription') def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True) self.pubsub_matcher.expected_msg = [PubsubMessage('a', {})] mock_sub = mock_get_sub.return_value - msg_a = pubsub.message.Message(b'a', 'unused_id') - msg_a.attributes['k'] = 'v' # Unexpected. - mock_sub.pull.side_effect = [[(1, msg_a)]] + # Unexpected attribute 'k'. + mock_sub.pull.side_effect = [ + create_pull_response([PullResponseMessage(b'a', {'k': 'v'})]) + ] with self.assertRaisesRegexp(AssertionError, r'Unexpected'): hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) + self.assertEqual(mock_sub.acknowledge.call_count, 1) - @mock.patch('time.sleep', return_value=None) - @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.' - 'PubSubMessageMatcher._get_subscription') def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True, strip_attributes=['id', 'timestamp']) self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})] mock_sub = mock_get_sub.return_value - msg_a = pubsub.message.Message(b'a', 'unused_id') - msg_a.attributes['id'] = 'foo' - msg_a.attributes['timestamp'] = 'bar' - msg_a.attributes['k'] = 'v' - mock_sub.pull.side_effect = [[(1, msg_a)]] + mock_sub.pull.side_effect = [create_pull_response([ + PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'}) + ])] hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) + self.assertEqual(mock_sub.acknowledge.call_count, 1) - @mock.patch('time.sleep', return_value=None) - @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.' - 'PubSubMessageMatcher._get_subscription') def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True, strip_attributes=['id', 'timestamp']) self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})] mock_sub = mock_get_sub.return_value - # msg_a is missing attribute 'timestamp'. - msg_a = pubsub.message.Message(b'a', 'unused_id') - msg_a.attributes['id'] = 'foo' - msg_a.attributes['k'] = 'v' - mock_sub.pull.side_effect = [[(1, msg_a)]] + # Message is missing attribute 'timestamp'. + mock_sub.pull.side_effect = [create_pull_response([ + PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'}) + ])] with self.assertRaisesRegexp(AssertionError, r'Stripped attributes'): hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) + self.assertEqual(mock_sub.acknowledge.call_count, 1) - @mock.patch('time.sleep', return_value=None) - @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.' - 'PubSubMessageMatcher._get_subscription') def test_message_matcher_mismatch(self, mock_get_sub, unused_mock): self.init_matcher() self.pubsub_matcher.expected_msg = ['a'] mock_sub = mock_get_sub.return_value - mock_sub.pull.return_value = [ - (1, pubsub.message.Message(b'c', 'unused_id')), - (1, pubsub.message.Message(b'd', 'unused_id')), + mock_sub.pull.side_effect = [ + create_pull_response([PullResponseMessage(b'c', {}), + PullResponseMessage(b'd', {})]), ] with self.assertRaises(AssertionError) as error: hc_assert_that(self.mock_presult, self.pubsub_matcher) @@ -147,10 +134,9 @@ class PubSubMatcherTest(unittest.TestCase): self.assertTrue( '\nExpected: Expected 1 messages.\n but: Got 2 messages.' in str(error.exception.args[0])) + self.assertEqual(mock_sub.pull.call_count, 1) + self.assertEqual(mock_sub.acknowledge.call_count, 1) - @mock.patch('time.sleep', return_value=None) - @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.' - 'PubSubMessageMatcher._get_subscription') def test_message_matcher_timeout(self, mock_get_sub, unused_mock): self.init_matcher() mock_sub = mock_get_sub.return_value @@ -159,6 +145,7 @@ class PubSubMatcherTest(unittest.TestCase): with self.assertRaisesRegexp(AssertionError, r'Expected 1.*\n.*Got 0'): hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertTrue(mock_sub.pull.called) + self.assertEqual(mock_sub.acknowledge.call_count, 0) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 00e37f3..d410992 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -25,6 +25,7 @@ from __future__ import absolute_import import itertools import logging +import time from google.protobuf import wrappers_pb2 @@ -264,11 +265,12 @@ class _DirectReadFromPubSub(PTransform): class _DirectWriteToPubSubFn(DoFn): - _topic = None + BUFFER_SIZE_ELEMENTS = 100 + FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5 def __init__(self, sink): self.project = sink.project - self.topic_name = sink.topic_name + self.short_topic_name = sink.topic_name self.id_label = sink.id_label self.timestamp_attribute = sink.timestamp_attribute self.with_attributes = sink.with_attributes @@ -282,30 +284,33 @@ class _DirectWriteToPubSubFn(DoFn): 'supported for PubSub writes') def start_bundle(self): - from google.cloud import pubsub - - if self._topic is None: - self._topic = pubsub.Client(project=self.project).topic( - self.topic_name) self._buffer = [] def process(self, elem): self._buffer.append(elem) - if len(self._buffer) >= 100: + if len(self._buffer) >= self.BUFFER_SIZE_ELEMENTS: self._flush() def finish_bundle(self): self._flush() def _flush(self): - if self._buffer: - with self._topic.batch() as batch: - for elem in self._buffer: - if self.with_attributes: - batch.publish(elem.data, **elem.attributes) - else: - batch.publish(elem) - self._buffer = [] + from google.cloud import pubsub + pub_client = pubsub.PublisherClient() + topic = pub_client.topic_path(self.project, self.short_topic_name) + + if self.with_attributes: + futures = [pub_client.publish(topic, elem.data, **elem.attributes) + for elem in self._buffer] + else: + futures = [pub_client.publish(topic, elem) + for elem in self._buffer] + + timer_start = time.time() + for future in futures: + remaining = self.FLUSH_TIMEOUT_SECS - (time.time() - timer_start) + future.result(remaining) + self._buffer = [] def _get_pubsub_transform_overrides(pipeline_options): diff --git a/sdks/python/apache_beam/runners/direct/test_direct_runner.py b/sdks/python/apache_beam/runners/direct/test_direct_runner.py index 8facca8..23dfeab 100644 --- a/sdks/python/apache_beam/runners/direct/test_direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/test_direct_runner.py @@ -52,5 +52,6 @@ class TestDirectRunner(DirectRunner): finally: if not PipelineState.is_terminal(self.result.state): self.result.cancel() + self.result.wait_until_finish() return self.result diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index ef12e2c..fad0704 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -377,20 +377,44 @@ class _TestStreamEvaluator(_TransformEvaluator): class _PubSubSubscriptionWrapper(object): - """Wrapper for garbage-collecting temporary PubSub subscriptions.""" + """Wrapper for managing temporary PubSub subscriptions.""" - def __init__(self, subscription, should_cleanup): - self.subscription = subscription - self.should_cleanup = should_cleanup + def __init__(self, project, short_topic_name, short_sub_name): + """Initialize subscription wrapper. + + If sub_name is None, will create a temporary subscription to topic_name. + + Args: + project: GCP project name for topic and subscription. May be None. + Required if sub_name is None. + short_topic_name: Valid topic name without + 'projects/{project}/topics/' prefix. May be None. + Required if sub_name is None. + short_sub_name: Valid subscription name without + 'projects/{project}/subscriptions/' prefix. May be None. + """ + from google.cloud import pubsub + self.sub_client = pubsub.SubscriberClient() + + if short_sub_name is None: + self.sub_name = self.sub_client.subscription_path( + project, 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32))) + topic_name = self.sub_client.topic_path(project, short_topic_name) + self.sub_client.create_subscription(self.sub_name, topic_name) + self._should_cleanup = True + else: + self.sub_name = self.sub_client.subscription_path(project, short_sub_name) + self._should_cleanup = False def __del__(self): - if self.should_cleanup: - self.subscription.delete() + if self._should_cleanup: + self.sub_client.delete_subscription(self.sub_name) class _PubSubReadEvaluator(_TransformEvaluator): """TransformEvaluator for PubSub read.""" + # A mapping of transform to _PubSubSubscriptionWrapper. _subscription_cache = {} def __init__(self, evaluation_context, applied_ptransform, @@ -404,26 +428,16 @@ class _PubSubReadEvaluator(_TransformEvaluator): if self.source.id_label: raise NotImplementedError( 'DirectRunner: id_label is not supported for PubSub reads') - self._subscription = _PubSubReadEvaluator.get_subscription( + self._sub_name = _PubSubReadEvaluator.get_subscription( self._applied_ptransform, self.source.project, self.source.topic_name, self.source.subscription_name) @classmethod - def get_subscription(cls, transform, project, topic, subscription_name): + def get_subscription(cls, transform, project, topic, short_sub_name): if transform not in cls._subscription_cache: - from google.cloud import pubsub - should_create = not subscription_name - if should_create: - subscription_name = 'beam_%d_%x' % ( - int(time.time()), random.randrange(1 << 32)) - wrapper = _PubSubSubscriptionWrapper( - pubsub.Client(project=project).topic(topic).subscription( - subscription_name), - should_create) - if should_create: - wrapper.subscription.create() + wrapper = _PubSubSubscriptionWrapper(project, topic, short_sub_name) cls._subscription_cache[transform] = wrapper - return cls._subscription_cache[transform].subscription + return cls._subscription_cache[transform].sub_name def start_bundle(self): pass @@ -438,28 +452,34 @@ class _PubSubReadEvaluator(_TransformEvaluator): # evaluator fails with an exception before emitting a bundle. However, # the DirectRunner currently doesn't retry work items anyway, so the # pipeline would enter an inconsistent state on any error. - with pubsub.subscription.AutoAck( - self._subscription, return_immediately=True, - max_messages=10) as results: - def _get_element(message): - parsed_message = PubsubMessage._from_message(message) - if (timestamp_attribute and - timestamp_attribute in parsed_message.attributes): - rfc3339_or_milli = parsed_message.attributes[timestamp_attribute] + sub_client = pubsub.SubscriberClient() + response = sub_client.pull(self._sub_name, max_messages=10, + return_immediately=True) + + def _get_element(message): + parsed_message = PubsubMessage._from_message(message) + if (timestamp_attribute and + timestamp_attribute in parsed_message.attributes): + rfc3339_or_milli = parsed_message.attributes[timestamp_attribute] + try: + timestamp = Timestamp.from_rfc3339(rfc3339_or_milli) + except ValueError: try: - timestamp = Timestamp.from_rfc3339(rfc3339_or_milli) - except ValueError: - try: - timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000) - except ValueError as e: - raise ValueError('Bad timestamp value: %s' % e) - else: - timestamp = Timestamp.from_rfc3339(message.service_timestamp) + timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000) + except ValueError as e: + raise ValueError('Bad timestamp value: %s' % e) + else: + timestamp = Timestamp(message.publish_time.seconds, + message.publish_time.nanos // 1000) + + return timestamp, parsed_message - return timestamp, parsed_message + results = [_get_element(rm.message) for rm in response.received_messages] + ack_ids = [rm.ack_id for rm in response.received_messages] + if ack_ids: + sub_client.acknowledge(self._sub_name, ack_ids) - return [_get_element(message) - for unused_ack_id, message in iteritems(results)] + return results def finish_bundle(self): data = self._read_from_pubsub(self.source.timestamp_attribute) diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py index 490d079..1f0e99e 100644 --- a/sdks/python/apache_beam/testing/test_utils.py +++ b/sdks/python/apache_beam/testing/test_utils.py @@ -24,11 +24,9 @@ from __future__ import absolute_import import hashlib import imp -import logging import os import shutil import tempfile -import time from builtins import object from mock import Mock @@ -136,46 +134,61 @@ def delete_files(file_paths): FileSystems.delete(file_paths) -def wait_for_subscriptions_created(subs, timeout=60): - """Wait for all PubSub subscriptions are created.""" - return _wait_until_all_exist(subs, timeout) - +def cleanup_subscriptions(sub_client, subs): + """Cleanup PubSub subscriptions if exist.""" + for sub in subs: + sub_client.delete_subscription(sub.name) -def wait_for_topics_created(topics, timeout=60): - """Wait for all PubSub topics are created.""" - return _wait_until_all_exist(topics, timeout) +def cleanup_topics(pub_client, topics): + """Cleanup PubSub topics if exist.""" + for topic in topics: + pub_client.delete_topic(topic.name) -def _wait_until_all_exist(components, timeout): - unchecked_components = set(components) - start_time = time.time() - while time.time() - start_time <= timeout: - unchecked_components = set( - [c for c in unchecked_components if not c.exists()]) - if len(unchecked_components) == 0: - return True - time.sleep(2) - raise RuntimeError( - 'Timeout after %d seconds. %d of %d topics/subscriptions not exist. ' - 'They are %s.' % (timeout, len(unchecked_components), - len(components), list(unchecked_components))) +class PullResponseMessage(object): + """Data representing a pull request response. + Utility class for ``create_pull_response``. + """ + def __init__(self, data, attributes=None, + publish_time_secs=None, publish_time_nanos=None, ack_id=None): + self.data = data + self.attributes = attributes + self.publish_time_secs = publish_time_secs + self.publish_time_nanos = publish_time_nanos + self.ack_id = ack_id -def cleanup_subscriptions(subs): - """Cleanup PubSub subscriptions if exist.""" - _cleanup_pubsub(subs) +def create_pull_response(responses): + """Create an instance of ``google.cloud.pubsub.types.ReceivedMessage``. -def cleanup_topics(topics): - """Cleanup PubSub topics if exist.""" - _cleanup_pubsub(topics) + Used to simulate the response from pubsub.SubscriberClient().pull(). + Args: + responses: list of ``PullResponseMessage`` -def _cleanup_pubsub(components): - for c in components: - if c.exists(): - c.delete() - else: - logging.debug('Cannot delete topic/subscription. %s does not exist.', - c.full_name) + Returns: + An instance of ``google.cloud.pubsub.types.PullResponse`` populated with + responses. + """ + from google.cloud import pubsub + + res = pubsub.types.PullResponse() + for response in responses: + received_message = res.received_messages.add() + + message = received_message.message + message.data = response.data + if response.attributes is not None: + for k, v in response.attributes.items(): + message.attributes[k] = v + if response.publish_time_secs is not None: + message.publish_time.seconds = response.publish_time_secs + if response.publish_time_nanos is not None: + message.publish_time.nanos = response.publish_time_nanos + + if response.ack_id is not None: + received_message.ack_id = response.ack_id + + return res diff --git a/sdks/python/apache_beam/testing/test_utils_test.py b/sdks/python/apache_beam/testing/test_utils_test.py index bef4078..2b16c30 100644 --- a/sdks/python/apache_beam/testing/test_utils_test.py +++ b/sdks/python/apache_beam/testing/test_utils_test.py @@ -82,56 +82,19 @@ class TestUtilsTest(unittest.TestCase): self.assertEqual(f.readline(), b'line2\n') self.assertEqual(f.readline(), b'line3\n') - @mock.patch('time.sleep', return_value=None) - def test_wait_for_subscriptions_created_fails(self, patched_time_sleep): - sub1 = mock.MagicMock() - sub1.exists.return_value = True - sub2 = mock.MagicMock() - sub2.exists.return_value = False - with self.assertRaises(RuntimeError) as error: - utils.wait_for_subscriptions_created([sub1, sub2], timeout=0.1) - self.assertTrue(sub1.exists.called) - self.assertTrue(sub2.exists.called) - self.assertTrue(error.exception.args[0].startswith('Timeout after')) - - @mock.patch('time.sleep', return_value=None) - def test_wait_for_topics_created_fails(self, patched_time_sleep): - topic1 = mock.MagicMock() - topic1.exists.return_value = True - topic2 = mock.MagicMock() - topic2.exists.return_value = False - with self.assertRaises(RuntimeError) as error: - utils.wait_for_subscriptions_created([topic1, topic2], timeout=0.1) - self.assertTrue(topic1.exists.called) - self.assertTrue(topic2.exists.called) - self.assertTrue(error.exception.args[0].startswith('Timeout after')) - - @mock.patch('time.sleep', return_value=None) - def test_wait_for_subscriptions_created_succeeds(self, patched_time_sleep): - sub1 = mock.MagicMock() - sub1.exists.return_value = True - self.assertTrue( - utils.wait_for_subscriptions_created([sub1], timeout=0.1)) - - @mock.patch('time.sleep', return_value=None) - def test_wait_for_topics_created_succeeds(self, patched_time_sleep): - topic1 = mock.MagicMock() - topic1.exists.return_value = True - self.assertTrue( - utils.wait_for_subscriptions_created([topic1], timeout=0.1)) - self.assertTrue(topic1.exists.called) - def test_cleanup_subscriptions(self): - mock_sub = mock.MagicMock() - mock_sub.exist.return_value = True - utils.cleanup_subscriptions([mock_sub]) - self.assertTrue(mock_sub.delete.called) + sub_client = mock.Mock() + sub = mock.Mock() + sub.name = 'test_sub' + utils.cleanup_subscriptions(sub_client, [sub]) + sub_client.delete_subscription.assert_called_with(sub.name) def test_cleanup_topics(self): - mock_topics = mock.MagicMock() - mock_topics.exist.return_value = True - utils.cleanup_subscriptions([mock_topics]) - self.assertTrue(mock_topics.delete.called) + pub_client = mock.Mock() + topic = mock.Mock() + topic.name = 'test_topic' + utils.cleanup_topics(pub_client, [topic]) + pub_client.delete_topic.assert_called_with(topic.name) if __name__ == '__main__': diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt index a32bc44..8e352a6 100644 --- a/sdks/python/container/base_image_requirements.txt +++ b/sdks/python/container/base_image_requirements.txt @@ -47,10 +47,9 @@ nose==1.3.7 # GCP extra features google-apitools==0.5.20 googledatastore==7.0.1 -google-cloud-pubsub==0.26.0 +google-cloud-pubsub==0.35.4 google-cloud-bigquery==0.25.0 proto-google-cloud-datastore-v1==0.90.4 -proto-google-cloud-pubsub-v1==0.15.4 # Optional packages cython==0.28.1 diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 1047167..a3db790 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -141,8 +141,7 @@ GCP_REQUIREMENTS = [ 'google-apitools>=0.5.18,<=0.5.20', 'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4', 'googledatastore==7.0.1; python_version < "3.0"', - 'google-cloud-pubsub==0.26.0', - 'proto-google-cloud-pubsub-v1==0.15.4', + 'google-cloud-pubsub==0.35.4', # GCP packages required by tests 'google-cloud-bigquery==0.25.0', ]