This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 776fd5a [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test (#4874) 776fd5a is described below commit 776fd5a6ae21352a20c388ecf23822e6def13854 Author: Mark Liu <markflyh...@users.noreply.github.com> AuthorDate: Tue Mar 20 18:58:52 2018 -0700 [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test (#4874) Create Python End-to-end Test for Streaming WordCount --- .../apache_beam/examples/streaming_wordcount.py | 2 +- .../examples/streaming_wordcount_it_test.py | 102 +++++++++++++++++++++ .../runners/dataflow/test_dataflow_runner.py | 37 +++++++- sdks/python/apache_beam/testing/test_pipeline.py | 4 +- sdks/python/apache_beam/testing/test_utils.py | 48 ++++++++++ sdks/python/apache_beam/testing/test_utils_test.py | 55 ++++++++++- 6 files changed, 242 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index 12f7351..7ef95d8 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -36,7 +36,7 @@ from apache_beam.options.pipeline_options import StandardOptions def split_fn(lines): import re - return re.findall(r'[A-Za-z\']+', lines) + return re.findall(r'[A-Za-z0-9\']+', lines) def run(argv=None): diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py new file mode 100644 index 0000000..a95e5fa --- /dev/null +++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for the streaming wordcount example. + +Important: End-to-end test infrastructure for streaming pipeine in Python SDK +is in development and is not yet available for use. + +Currently, this test blocks until the job is manually terminated. +""" + +import logging +import unittest + +from hamcrest.core.core.allof import all_of +from nose.plugins.attrib import attr + +from apache_beam.examples import streaming_wordcount +from apache_beam.runners.runner import PipelineState +from apache_beam.testing import test_utils +from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.test_pipeline import TestPipeline + +INPUT_TOPIC = 'wc_topic_input' +OUTPUT_TOPIC = 'wc_topic_output' +INPUT_SUB = 'wc_subscription_input' +OUTPUT_SUB = 'wc_subscription_output' + +DEFAULT_INPUT_NUMBERS = 500 + + +class StreamingWordCountIT(unittest.TestCase): + + def setUp(self): + self.test_pipeline = TestPipeline(is_integration_test=True) + + # Set up PubSub environment. + from google.cloud import pubsub + self.pubsub_client = pubsub.Client( + project=self.test_pipeline.get_option('project')) + self.input_topic = self.pubsub_client.topic(INPUT_TOPIC) + self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC) + self.input_sub = self.input_topic.subscription(INPUT_SUB) + self.output_sub = self.output_topic.subscription(OUTPUT_SUB) + + self._cleanup_pubsub() + + self.input_topic.create() + self.output_topic.create() + test_utils.wait_for_topics_created([self.input_topic, self.output_topic]) + self.input_sub.create() + self.output_sub.create() + + def _inject_numbers(self, topic, num_messages): + """Inject numbers as test data to PubSub.""" + logging.debug('Injecting %d numbers to topic %s', + num_messages, topic.full_name) + for n in range(num_messages): + topic.publish(str(n)) + + def _cleanup_pubsub(self): + test_utils.cleanup_subscriptions([self.input_sub, self.output_sub]) + test_utils.cleanup_topics([self.input_topic, self.output_topic]) + + def tearDown(self): + self._cleanup_pubsub() + + @attr('developing_test') + def test_streaming_wordcount_it(self): + # Set extra options to the pipeline for test purpose + pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)] + extra_opts = {'input_sub': self.input_sub.full_name, + 'output_topic': self.output_topic.full_name, + 'on_success_matcher': all_of(*pipeline_verifiers)} + + # Generate input data and inject to PubSub. + test_utils.wait_for_subscriptions_created([self.input_sub]) + self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS) + + # Get pipeline options from command argument: --test-pipeline-options, + # and start pipeline job by calling pipeline main function. + streaming_wordcount.run( + self.test_pipeline.get_full_options_as_args(**extra_opts)) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.DEBUG) + unittest.main() diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index aad3fc7..09a9190 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -18,13 +18,19 @@ """Wrapper of Beam runners that's built for running and verifying e2e tests.""" from __future__ import print_function +import time + from apache_beam.internal import pickler from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import TestOptions from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner +from apache_beam.runners.runner import PipelineState __all__ = ['TestDataflowRunner'] +WAIT_TIMEOUT = 2 * 60 + class TestDataflowRunner(DataflowRunner): def run_pipeline(self, pipeline): @@ -46,10 +52,39 @@ class TestDataflowRunner(DataflowRunner): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) - self.result.wait_until_finish() + + if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() + else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) return self.result + + def _is_in_terminate_state(self, job_state): + return job_state in [ + PipelineState.STOPPED, PipelineState.DONE, + PipelineState.FAILED, PipelineState.CANCELLED, + PipelineState.UPDATED, PipelineState.DRAINED, + ] + + def wait_until_running(self): + """Wait until Dataflow pipeline terminate or enter RUNNING state.""" + if not self.result.has_job: + raise IOError('Failed to get the Dataflow job id.') + + start_time = time.time() + while time.time() - start_time <= WAIT_TIMEOUT: + job_state = self.result.state + if (self._is_in_terminate_state(job_state) or + self.result.state == PipelineState.RUNNING): + return job_state + time.sleep(5) + + raise RuntimeError('Timeout after %d seconds while waiting for job %s ' + 'enters RUNNING or terminate state.' % + (WAIT_TIMEOUT, self.result.job_id)) diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py index 46eeb75..155190c 100644 --- a/sdks/python/apache_beam/testing/test_pipeline.py +++ b/sdks/python/apache_beam/testing/test_pipeline.py @@ -98,8 +98,8 @@ class TestPipeline(Pipeline): options = PipelineOptions(self.options_list) super(TestPipeline, self).__init__(runner, options) - def run(self): - result = super(TestPipeline, self).run() + def run(self, test_runner_api=True): + result = super(TestPipeline, self).run(test_runner_api) if self.blocking: state = result.wait_until_finish() assert state == PipelineState.DONE, "Pipeline execution failed." diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py index 5676186..f84b7f0 100644 --- a/sdks/python/apache_beam/testing/test_utils.py +++ b/sdks/python/apache_beam/testing/test_utils.py @@ -22,9 +22,11 @@ For internal use only; no backwards-compatibility guarantees. import hashlib import imp +import logging import os import shutil import tempfile +import time from mock import Mock from mock import patch @@ -129,3 +131,49 @@ def delete_files(file_paths): raise RuntimeError('Clean up failed. Invalid file path: %s.' % file_paths) FileSystems.delete(file_paths) + + +def wait_for_subscriptions_created(subs, timeout=60): + """Wait for all PubSub subscriptions are created.""" + return _wait_until_all_exist(subs, timeout) + + +def wait_for_topics_created(topics, timeout=60): + """Wait for all PubSub topics are created.""" + return _wait_until_all_exist(topics, timeout) + + +def _wait_until_all_exist(components, timeout): + needs_wait = set(components) + start_time = time.time() + while time.time() - start_time <= timeout: + for c in components: + if c in needs_wait and c.exists(): + needs_wait.remove(c) + if len(needs_wait) == 0: + return True + time.sleep(2) + + raise RuntimeError( + 'Timeout after %d seconds. %d of %d topics/subscriptions not exist. ' + 'They are %s.' % + (timeout, len(needs_wait), len(components), list(needs_wait))) + + +def cleanup_subscriptions(subs): + """Cleanup PubSub subscriptions if exist.""" + _cleanup_pubsub(subs) + + +def cleanup_topics(topics): + """Cleanup PubSub topics if exist.""" + _cleanup_pubsub(topics) + + +def _cleanup_pubsub(components): + for c in components: + if c.exists(): + c.delete() + else: + logging.debug('Cannot delete topic/subscription. %s does not exist.', + c.full_name) diff --git a/sdks/python/apache_beam/testing/test_utils_test.py b/sdks/python/apache_beam/testing/test_utils_test.py index 0018c0e..ba0b940 100644 --- a/sdks/python/apache_beam/testing/test_utils_test.py +++ b/sdks/python/apache_beam/testing/test_utils_test.py @@ -22,6 +22,8 @@ import os import tempfile import unittest +import mock + from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystems import FileSystems from apache_beam.testing import test_utils as utils @@ -57,8 +59,6 @@ class TestUtilsTest(unittest.TestCase): utils.delete_files([]) def test_temp_dir_removes_files(self): - dir_path = '' - file_path = '' with utils.TempDir() as tempdir: dir_path = tempdir.get_path() file_path = tempdir.create_temp_file() @@ -80,6 +80,57 @@ class TestUtilsTest(unittest.TestCase): self.assertEqual(f.readline(), 'line2\n') self.assertEqual(f.readline(), 'line3\n') + @mock.patch('time.sleep', return_value=None) + def test_wait_for_subscriptions_created_fails(self, patched_time_sleep): + sub1 = mock.MagicMock() + sub1.exists.return_value = True + sub2 = mock.MagicMock() + sub2.exists.return_value = False + with self.assertRaises(RuntimeError) as error: + utils.wait_for_subscriptions_created([sub1, sub2], timeout=0.1) + self.assertTrue(sub1.exists.called) + self.assertTrue(sub2.exists.called) + self.assertTrue(error.exception.args[0].startswith('Timeout after')) + + @mock.patch('time.sleep', return_value=None) + def test_wait_for_topics_created_fails(self, patched_time_sleep): + topic1 = mock.MagicMock() + topic1.exists.return_value = True + topic2 = mock.MagicMock() + topic2.exists.return_value = False + with self.assertRaises(RuntimeError) as error: + utils.wait_for_subscriptions_created([topic1, topic2], timeout=0.1) + self.assertTrue(topic1.exists.called) + self.assertTrue(topic2.exists.called) + self.assertTrue(error.exception.args[0].startswith('Timeout after')) + + @mock.patch('time.sleep', return_value=None) + def test_wait_for_subscriptions_created_succeeds(self, patched_time_sleep): + sub1 = mock.MagicMock() + sub1.exists.return_value = True + self.assertTrue( + utils.wait_for_subscriptions_created([sub1], timeout=0.1)) + + @mock.patch('time.sleep', return_value=None) + def test_wait_for_topics_created_succeeds(self, patched_time_sleep): + topic1 = mock.MagicMock() + topic1.exists.return_value = True + self.assertTrue( + utils.wait_for_subscriptions_created([topic1], timeout=0.1)) + self.assertTrue(topic1.exists.called) + + def test_cleanup_subscriptions(self): + mock_sub = mock.MagicMock() + mock_sub.exist.return_value = True + utils.cleanup_subscriptions([mock_sub]) + self.assertTrue(mock_sub.delete.called) + + def test_cleanup_topics(self): + mock_topics = mock.MagicMock() + mock_topics.exist.return_value = True + utils.cleanup_subscriptions([mock_topics]) + self.assertTrue(mock_topics.delete.called) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) -- To stop receiving notification emails like this one, please contact al...@apache.org.