[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83703=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83703 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 17:28 Start Date: 23/Mar/18 17:28 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375741821 Yes, I created a jira [BEAM-3922](https://issues.apache.org/jira/browse/BEAM-3922) to track PostCommit failure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83703) Time Spent: 8h 50m (was: 8h 40m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 8h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83702=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83702 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 17:28 Start Date: 23/Mar/18 17:28 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375741821 Yes, I create a jira [BEAM-3922](https://issues.apache.org/jira/browse/BEAM-3922) to track PostCommit failure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83702) Time Spent: 8h 40m (was: 8.5h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 8h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83704=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83704 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 17:28 Start Date: 23/Mar/18 17:28 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375741821 Yes, I created a jira [BEAM-3922](https://issues.apache.org/jira/browse/BEAM-3922) to track this PostCommit failure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83704) Time Spent: 9h (was: 8h 50m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 9h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83692=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83692 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 17:22 Start Date: 23/Mar/18 17:22 Worklog Time Spent: 10m Work Description: aaltay closed pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py index a95e5fa8f53..04e6e4e7370 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py @@ -23,13 +23,17 @@ Currently, this test blocks until the job is manually terminated. """ +import datetime import logging +import random import unittest +import uuid from hamcrest.core.core.allof import all_of from nose.plugins.attrib import attr from apache_beam.examples import streaming_wordcount +from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher from apache_beam.runners.runner import PipelineState from apache_beam.testing import test_utils from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher @@ -47,17 +51,16 @@ class StreamingWordCountIT(unittest.TestCase): def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) +self.project = self.test_pipeline.get_option('project') +self.uuid = str(uuid.uuid4()) # Set up PubSub environment. from google.cloud import pubsub -self.pubsub_client = pubsub.Client( -project=self.test_pipeline.get_option('project')) -self.input_topic = self.pubsub_client.topic(INPUT_TOPIC) -self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC) -self.input_sub = self.input_topic.subscription(INPUT_SUB) -self.output_sub = self.output_topic.subscription(OUTPUT_SUB) - -self._cleanup_pubsub() +self.pubsub_client = pubsub.Client(project=self.project) +self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid) +self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid) +self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid) +self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid) self.input_topic.create() self.output_topic.create() @@ -65,6 +68,11 @@ def setUp(self): self.input_sub.create() self.output_sub.create() + def _generate_identifier(self): +seed = random.randint(0, 999) +current_time = datetime.datetime.now().strftime('%m%d%H%M%S') +return '%s%d' % (current_time, seed) + def _inject_numbers(self, topic, num_messages): """Inject numbers as test data to PubSub.""" logging.debug('Injecting %d numbers to topic %s', @@ -79,13 +87,21 @@ def _cleanup_pubsub(self): def tearDown(self): self._cleanup_pubsub() - @attr('developing_test') + @attr('IT') def test_streaming_wordcount_it(self): +# Build expected dataset. +expected_msg = [('%d: 1' % num) for num in range(DEFAULT_INPUT_NUMBERS)] + # Set extra options to the pipeline for test purpose -pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)] -extra_opts = {'input_sub': self.input_sub.full_name, +state_verifier = PipelineStateMatcher(PipelineState.RUNNING) +pubsub_msg_verifier = PubSubMessageMatcher(self.project, + OUTPUT_SUB + self.uuid, + expected_msg, + timeout=400) +extra_opts = {'input_subscription': self.input_sub.full_name, 'output_topic': self.output_topic.full_name, - 'on_success_matcher': all_of(*pipeline_verifiers)} + 'on_success_matcher': all_of(state_verifier, + pubsub_msg_verifier)} # Generate input data and inject to PubSub. test_utils.wait_for_subscriptions_created([self.input_sub]) diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py new file mode 100644 index 000..8fb687908d8 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83691=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83691 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 17:21 Start Date: 23/Mar/18 17:21 Worklog Time Spent: 10m Work Description: aaltay commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375739739 Thank you @markflyhigh I will merge this. It looks like there is a bug related to recently introduced https://github.com/apache/beam/pull/4781 (I will file the bug.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83691) Time Spent: 8h 20m (was: 8h 10m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 8h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83628=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83628 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 15:54 Start Date: 23/Mar/18 15:54 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375712049 From the [console log](https://builds.apache.org/job/beam_PostCommit_Python_Verify/4487/consoleFull): ``` test_streaming_wordcount_it (apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT) ... ok test_wordcount_fnapi_it (apache_beam.examples.wordcount_it_test.WordCountIT) ... ok test_bigquery_tornadoes_it (apache_beam.examples.cookbook.bigquery_tornadoes_it_test.BigqueryTornadoesIT) ... ok test_wordcount_it (apache_beam.examples.wordcount_it_test.WordCountIT) ... ERROR ``` `test_wordcount_it` failed with no attribute error, which cause Jenkins Postcommit failed since Mar 21st. Didn't have a chance to look into that, but it doesn't affected `test_streaming_wordcount_it`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83628) Time Spent: 8h (was: 7h 50m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 8h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83485=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83485 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 06:23 Start Date: 23/Mar/18 06:23 Worklog Time Spent: 10m Work Description: aaltay commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375556990 Do you know why post commit tests failed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83485) Time Spent: 7h 50m (was: 7h 40m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83467=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83467 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 05:12 Start Date: 23/Mar/18 05:12 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375547198 Run Python PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83467) Time Spent: 7h 40m (was: 7.5h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 7h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83413=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83413 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 00:52 Start Date: 23/Mar/18 00:52 Worklog Time Spent: 10m Work Description: aaltay commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375504394 LGTM. Please squash, I can merge after tests pass. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83413) Time Spent: 7.5h (was: 7h 20m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83396=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83396 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 00:05 Start Date: 23/Mar/18 00:05 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375497207 @aaltay PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83396) Time Spent: 7h 20m (was: 7h 10m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 7h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83395=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83395 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 00:03 Start Date: 23/Mar/18 00:03 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375496804 Run Python PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83395) Time Spent: 7h 10m (was: 7h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83393=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83393 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 00:01 Start Date: 23/Mar/18 00:01 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176607403 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): + raise ValueError('Invalid expected messages %s.' % expected_msg) + +self.project = project +self.sub_name = sub_name +self.expected_msg = expected_msg +self.timeout = timeout +self.messages = None + + def _matches(self, _): +if not self.messages: + subscription = (pubsub + .Client(project=self.project) + .subscription(self.sub_name)) + self.messages = self._wait_for_messages(subscription, Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83393) Time Spent: 6h 50m (was: 6h 40m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 6h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83394=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83394 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 00:01 Start Date: 23/Mar/18 00:01 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176607424 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): + raise ValueError('Invalid expected messages %s.' % expected_msg) + +self.project = project +self.sub_name = sub_name +self.expected_msg = expected_msg +self.timeout = timeout +self.messages = None + + def _matches(self, _): +if not self.messages: + subscription = (pubsub + .Client(project=self.project) + .subscription(self.sub_name)) + self.messages = self._wait_for_messages(subscription, + len(self.expected_msg), + self.timeout) +return Counter(self.messages) == Counter(self.expected_msg) + + def _wait_for_messages(self, subscription, expected_num, timeout): +"""Wait for messages from given subscription.""" +logging.debug('Start pulling messages from %s', subscription.full_name) +total_messages = [] +start_time = time.time() +while time.time() - start_time <= timeout: + pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL) + for ack_id, message in pulled: +total_messages.append(message.data) +subscription.acknowledge([ack_id]) + if len(total_messages) >= expected_num: +return total_messages + time.sleep(1) + +raise RuntimeError('Timeout after %d sec. Received %d messages from %s.' % + (timeout, len(total_messages), subscription.full_name)) + + def describe_to(self, description): +description.append_text( +'Expected %d messages.' % len(self.expected_msg)) + + def describe_mismatch(self, _, mismatch_description): +diff = set(self.expected_msg) - set(self.messages) Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83392=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83392 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 23/Mar/18 00:01 Start Date: 23/Mar/18 00:01 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176607385 ## File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py ## @@ -47,24 +50,32 @@ class StreamingWordCountIT(unittest.TestCase): def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) +self.project = self.test_pipeline.get_option('project') +self.identifier = self._generate_identifier() Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83392) Time Spent: 6h 40m (was: 6.5h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 6h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83391=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83391 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 23:56 Start Date: 22/Mar/18 23:56 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176606739 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline): self.result = super(TestDataflowRunner, self).run_pipeline(pipeline) if self.result.has_job: - project = pipeline._options.view_as(GoogleCloudOptions).project - region_id = pipeline._options.view_as(GoogleCloudOptions).region - job_id = self.result.job_id() # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. - print ( - 'Found: https://console.cloud.google.com/dataflow/jobsDetail' - '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) + print('Found: %s.' % self.build_console_url(pipeline.options)) if not options.view_as(StandardOptions).streaming: self.result.wait_until_finish() else: - # TODO: Ideally, we want to wait until workers start successfully. - self.wait_until_running() + self.wait_until_in_state(PipelineState.RUNNING) if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) +if options.view_as(StandardOptions).streaming: + self.result.cancel() Review comment: I missed the success matchers above. This is fine as it is. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83391) Time Spent: 6.5h (was: 6h 20m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83338=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83338 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 21:04 Start Date: 22/Mar/18 21:04 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176571812 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): + raise ValueError('Invalid expected messages %s.' % expected_msg) + +self.project = project +self.sub_name = sub_name +self.expected_msg = expected_msg +self.timeout = timeout +self.messages = None + + def _matches(self, _): +if not self.messages: + subscription = (pubsub + .Client(project=self.project) + .subscription(self.sub_name)) + self.messages = self._wait_for_messages(subscription, Review comment: just realize this method returns received messages, will return instead of raise exception in all cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83338) Time Spent: 6h 20m (was: 6h 10m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83312=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83312 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 19:54 Start Date: 22/Mar/18 19:54 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176552990 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline): self.result = super(TestDataflowRunner, self).run_pipeline(pipeline) if self.result.has_job: - project = pipeline._options.view_as(GoogleCloudOptions).project - region_id = pipeline._options.view_as(GoogleCloudOptions).region - job_id = self.result.job_id() # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. - print ( - 'Found: https://console.cloud.google.com/dataflow/jobsDetail' - '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) + print('Found: %s.' % self.build_console_url(pipeline.options)) if not options.view_as(StandardOptions).streaming: self.result.wait_until_finish() else: - # TODO: Ideally, we want to wait until workers start successfully. - self.wait_until_running() + self.wait_until_in_state(PipelineState.RUNNING) if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) +if options.view_as(StandardOptions).streaming: + self.result.cancel() Review comment: But if you can cancel, before you start the verification process, the pipeline may still not be able to process the data. I think we can simply sleep a bit before calling cancel. (Also note that pipeline running state is reached before workers actually doing meaning work. They still need to install SDK etc.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83312) Time Spent: 6h 10m (was: 6h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83283=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83283 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 18:37 Start Date: 22/Mar/18 18:37 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176530410 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline): self.result = super(TestDataflowRunner, self).run_pipeline(pipeline) if self.result.has_job: - project = pipeline._options.view_as(GoogleCloudOptions).project - region_id = pipeline._options.view_as(GoogleCloudOptions).region - job_id = self.result.job_id() # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. - print ( - 'Found: https://console.cloud.google.com/dataflow/jobsDetail' - '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) + print('Found: %s.' % self.build_console_url(pipeline.options)) if not options.view_as(StandardOptions).streaming: self.result.wait_until_finish() else: - # TODO: Ideally, we want to wait until workers start successfully. - self.wait_until_running() + self.wait_until_in_state(PipelineState.RUNNING) if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) +if options.view_as(StandardOptions).streaming: + self.result.cancel() Review comment: I see. A customized matcher is probably one approach for this. Like `PubSubMessageMatcher`, the verification is blocked until messaged pulled (or timeout). So wait logic inside `on_success_matcher` can give pipeline time to process the data. However, cancel will be called immediately after setup only if no `on_success_matcher` is provided. Do we want a waiting step here as a general step of the test? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83283) Time Spent: 6h (was: 5h 50m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83260=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83260 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 17:42 Start Date: 22/Mar/18 17:42 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176489484 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): + raise ValueError('Invalid expected messages %s.' % expected_msg) + +self.project = project +self.sub_name = sub_name +self.expected_msg = expected_msg +self.timeout = timeout +self.messages = None + + def _matches(self, _): +if not self.messages: + subscription = (pubsub + .Client(project=self.project) + .subscription(self.sub_name)) + self.messages = self._wait_for_messages(subscription, + len(self.expected_msg), + self.timeout) +return Counter(self.messages) == Counter(self.expected_msg) + + def _wait_for_messages(self, subscription, expected_num, timeout): +"""Wait for messages from given subscription.""" +logging.debug('Start pulling messages from %s', subscription.full_name) +total_messages = [] +start_time = time.time() +while time.time() - start_time <= timeout: + pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL) + for ack_id, message in pulled: +total_messages.append(message.data) +subscription.acknowledge([ack_id]) + if len(total_messages) >= expected_num: Review comment: Only supporting full output matching is what I'm intend to do. I found it's hard to know if more output will appear after all expected data are pulled. I try to avoid adding extra wait time after all expected data are pulled, which may inefficient in some cases. So in `_matches` line 82, I verify exactly match of two datasets. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83252=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83252 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 17:15 Start Date: 22/Mar/18 17:15 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176502035 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline): self.result = super(TestDataflowRunner, self).run_pipeline(pipeline) if self.result.has_job: - project = pipeline._options.view_as(GoogleCloudOptions).project - region_id = pipeline._options.view_as(GoogleCloudOptions).region - job_id = self.result.job_id() # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. - print ( - 'Found: https://console.cloud.google.com/dataflow/jobsDetail' - '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) + print('Found: %s.' % self.build_console_url(pipeline.options)) if not options.view_as(StandardOptions).streaming: self.result.wait_until_finish() else: - # TODO: Ideally, we want to wait until workers start successfully. - self.wait_until_running() + self.wait_until_in_state(PipelineState.RUNNING) if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) +if options.view_as(StandardOptions).streaming: + self.result.cancel() Review comment: Because we want pipeline to work for a bit after the setup is completed. setup -> cancel does not really allow anytime for pipeline to process data. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83252) Time Spent: 5h 40m (was: 5.5h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83237=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83237 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 16:47 Start Date: 22/Mar/18 16:47 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176492223 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): + raise ValueError('Invalid expected messages %s.' % expected_msg) + +self.project = project +self.sub_name = sub_name +self.expected_msg = expected_msg +self.timeout = timeout +self.messages = None + + def _matches(self, _): +if not self.messages: + subscription = (pubsub + .Client(project=self.project) + .subscription(self.sub_name)) + self.messages = self._wait_for_messages(subscription, Review comment: Looks like return `False` is better considering the rest of matchers can be executed instead of break here, and pipeline will be canceled at the end of test. Will fix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83237) Time Spent: 5.5h (was: 5h 20m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83227=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83227 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 16:41 Start Date: 22/Mar/18 16:41 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176489484 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): + raise ValueError('Invalid expected messages %s.' % expected_msg) + +self.project = project +self.sub_name = sub_name +self.expected_msg = expected_msg +self.timeout = timeout +self.messages = None + + def _matches(self, _): +if not self.messages: + subscription = (pubsub + .Client(project=self.project) + .subscription(self.sub_name)) + self.messages = self._wait_for_messages(subscription, + len(self.expected_msg), + self.timeout) +return Counter(self.messages) == Counter(self.expected_msg) + + def _wait_for_messages(self, subscription, expected_num, timeout): +"""Wait for messages from given subscription.""" +logging.debug('Start pulling messages from %s', subscription.full_name) +total_messages = [] +start_time = time.time() +while time.time() - start_time <= timeout: + pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL) + for ack_id, message in pulled: +total_messages.append(message.data) +subscription.acknowledge([ack_id]) + if len(total_messages) >= expected_num: Review comment: Only supporting full output matching is what I'm intend to do. I found it's hard to know if more output will appear after all expected data are pulled. I try to avoid adding extra wait time after all expected data are pulled, which may inefficient in some cases. So in `_matches` line 82, I verify exactly match of two dataset. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83228=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83228 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 16:41 Start Date: 22/Mar/18 16:41 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176490292 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline): self.result = super(TestDataflowRunner, self).run_pipeline(pipeline) if self.result.has_job: - project = pipeline._options.view_as(GoogleCloudOptions).project - region_id = pipeline._options.view_as(GoogleCloudOptions).region - job_id = self.result.job_id() # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. - print ( - 'Found: https://console.cloud.google.com/dataflow/jobsDetail' - '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) + print('Found: %s.' % self.build_console_url(pipeline.options)) if not options.view_as(StandardOptions).streaming: self.result.wait_until_finish() else: - # TODO: Ideally, we want to wait until workers start successfully. - self.wait_until_running() + self.wait_until_in_state(PipelineState.RUNNING) if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) +if options.view_as(StandardOptions).streaming: + self.result.cancel() Review comment: sure. Can you explain more why we want to add a wait? My understanding is that it's better to call cancel after worker successfully started, right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83228) Time Spent: 5h 10m (was: 5h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83229=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83229 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 16:41 Start Date: 22/Mar/18 16:41 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176489600 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): + raise ValueError('Invalid expected messages %s.' % expected_msg) + +self.project = project +self.sub_name = sub_name +self.expected_msg = expected_msg +self.timeout = timeout +self.messages = None + + def _matches(self, _): +if not self.messages: + subscription = (pubsub + .Client(project=self.project) + .subscription(self.sub_name)) + self.messages = self._wait_for_messages(subscription, + len(self.expected_msg), + self.timeout) +return Counter(self.messages) == Counter(self.expected_msg) + + def _wait_for_messages(self, subscription, expected_num, timeout): +"""Wait for messages from given subscription.""" +logging.debug('Start pulling messages from %s', subscription.full_name) +total_messages = [] +start_time = time.time() +while time.time() - start_time <= timeout: + pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL) + for ack_id, message in pulled: +total_messages.append(message.data) +subscription.acknowledge([ack_id]) + if len(total_messages) >= expected_num: +return total_messages + time.sleep(1) + +raise RuntimeError('Timeout after %d sec. Received %d messages from %s.' % + (timeout, len(total_messages), subscription.full_name)) + + def describe_to(self, description): +description.append_text( +'Expected %d messages.' % len(self.expected_msg)) + + def describe_mismatch(self, _, mismatch_description): +diff = set(self.expected_msg) - set(self.messages) Review comment: good point! will change. This is an automated message from the Apache Git Service. To respond to the message, please log
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83058 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 04:19 Start Date: 22/Mar/18 04:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176304644 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): + raise ValueError('Invalid expected messages %s.' % expected_msg) + +self.project = project +self.sub_name = sub_name +self.expected_msg = expected_msg +self.timeout = timeout +self.messages = None + + def _matches(self, _): +if not self.messages: + subscription = (pubsub + .Client(project=self.project) + .subscription(self.sub_name)) + self.messages = self._wait_for_messages(subscription, Review comment: Should this return `False` in case of a timeout, instead of raising an exception? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83058) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83057 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 04:19 Start Date: 22/Mar/18 04:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176304742 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): + raise ValueError('Invalid expected messages %s.' % expected_msg) + +self.project = project +self.sub_name = sub_name +self.expected_msg = expected_msg +self.timeout = timeout +self.messages = None + + def _matches(self, _): +if not self.messages: + subscription = (pubsub + .Client(project=self.project) + .subscription(self.sub_name)) + self.messages = self._wait_for_messages(subscription, + len(self.expected_msg), + self.timeout) +return Counter(self.messages) == Counter(self.expected_msg) + + def _wait_for_messages(self, subscription, expected_num, timeout): +"""Wait for messages from given subscription.""" +logging.debug('Start pulling messages from %s', subscription.full_name) +total_messages = [] +start_time = time.time() +while time.time() - start_time <= timeout: + pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL) + for ack_id, message in pulled: +total_messages.append(message.data) +subscription.acknowledge([ack_id]) + if len(total_messages) >= expected_num: Review comment: This is a little strange. This matcher will be only useful when testing for the full output. It would not be useful for testing say elements x, y, z should be part of the whole output type of a use case. Is this what you intended to do? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83057) > Build test
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83060=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83060 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 04:19 Start Date: 22/Mar/18 04:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176304852 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): + raise ValueError('Invalid expected messages %s.' % expected_msg) + +self.project = project +self.sub_name = sub_name +self.expected_msg = expected_msg +self.timeout = timeout +self.messages = None + + def _matches(self, _): +if not self.messages: + subscription = (pubsub + .Client(project=self.project) + .subscription(self.sub_name)) + self.messages = self._wait_for_messages(subscription, + len(self.expected_msg), + self.timeout) +return Counter(self.messages) == Counter(self.expected_msg) + + def _wait_for_messages(self, subscription, expected_num, timeout): +"""Wait for messages from given subscription.""" +logging.debug('Start pulling messages from %s', subscription.full_name) +total_messages = [] +start_time = time.time() +while time.time() - start_time <= timeout: + pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL) + for ack_id, message in pulled: +total_messages.append(message.data) +subscription.acknowledge([ack_id]) + if len(total_messages) >= expected_num: +return total_messages + time.sleep(1) + +raise RuntimeError('Timeout after %d sec. Received %d messages from %s.' % + (timeout, len(total_messages), subscription.full_name)) + + def describe_to(self, description): +description.append_text( +'Expected %d messages.' % len(self.expected_msg)) + + def describe_mismatch(self, _, mismatch_description): +diff = set(self.expected_msg) - set(self.messages) Review comment: don't you want to use a counter here to? That is what you used for matching. (For example if you are checking for messages x, x and only x appears, it will mismatch (correctly), however
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83056=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83056 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 04:19 Start Date: 22/Mar/18 04:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176304163 ## File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py ## @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PubSub verifier used for end-to-end test.""" + +import logging +import time +from collections import Counter + +from hamcrest.core.base_matcher import BaseMatcher + +__all__ = ['PubSubMessageMatcher'] + + +# Protect against environments where pubsub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + +DEFAULT_TIMEOUT = 5 * 60 +MAX_MESSAGES_IN_ONE_PULL = 50 + + +class PubSubMessageMatcher(BaseMatcher): + """Matcher that verifies messages from given subscription. + + This matcher can block the test and keep pulling messages from given + subscription until all expected messages are shown or timeout. + """ + + def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT): +"""Initialize PubSubMessageMatcher object. + +Args: + project: A name string of project. + sub_name: A name string of subscription which is attached to output. + expected_msg: A string list that contains expected message data pulled +from the subscription. + timeout: Timeout in seconds to wait for all expected messages appears. +""" +if pubsub is None: + raise ImportError( + 'PubSub dependencies are not installed.') +if not project: + raise ValueError('Invalid project %s.' % project) +if not sub_name: + raise ValueError('Invalid subscription %s.' % sub_name) +if not expected_msg or not isinstance(expected_msg, list): Review comment: `if not isinstance(expected_msg, list):` should be enough? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83056) Time Spent: 5h (was: 4h 50m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83055=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83055 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 04:19 Start Date: 22/Mar/18 04:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176305077 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline): self.result = super(TestDataflowRunner, self).run_pipeline(pipeline) if self.result.has_job: - project = pipeline._options.view_as(GoogleCloudOptions).project - region_id = pipeline._options.view_as(GoogleCloudOptions).region - job_id = self.result.job_id() # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. - print ( - 'Found: https://console.cloud.google.com/dataflow/jobsDetail' - '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) + print('Found: %s.' % self.build_console_url(pipeline.options)) if not options.view_as(StandardOptions).streaming: self.result.wait_until_finish() else: - # TODO: Ideally, we want to wait until workers start successfully. - self.wait_until_running() + self.wait_until_in_state(PipelineState.RUNNING) if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) +if options.view_as(StandardOptions).streaming: + self.result.cancel() Review comment: Do you want to cancel the job immediately after it is reaching to running state? Should we wait a bit first? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83055) Time Spent: 4h 50m (was: 4h 40m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83059=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83059 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 04:19 Start Date: 22/Mar/18 04:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176303844 ## File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py ## @@ -47,24 +50,32 @@ class StreamingWordCountIT(unittest.TestCase): def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) +self.project = self.test_pipeline.get_option('project') +self.identifier = self._generate_identifier() Review comment: You could use uuid instead. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83059) Time Spent: 5h (was: 4h 50m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83001=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83001 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 00:33 Start Date: 22/Mar/18 00:33 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375140129 +R: @aaltay PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83001) Time Spent: 4.5h (was: 4h 20m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83002=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83002 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 00:33 Start Date: 22/Mar/18 00:33 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#issuecomment-375140129 +R: @aaltay Ready to review. PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83002) Time Spent: 4h 40m (was: 4.5h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83000=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83000 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 22/Mar/18 00:30 Start Date: 22/Mar/18 00:30 Worklog Time Spent: 10m Work Description: markflyhigh opened a new pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930 Complete Python end-to-end testing framework to support streaming pipelines that use PubSub. Improvements are: - Add `PubSubMessageMatcher` which is a customized hamcrest matcher to pull messages from given subscription and verify content. - Auto-cancel streaming pipeline by test framework after all matchers are verified. - Few minor improvements and cleanup in `TestDataflowRunner` `StreamingWordCountITTest` can verify output messages and cancel pipeline after this change. The overall test workflow is: 1. Set up PubSub topics and subscriptions. 1. Build test options including required pipeline options and test verifiers. 1. Inject data to input topic. 1. Call `StreamingWordCount.run` with above options to start the pipeline. 1. Wait until pipeline start running. 1. Run verifiers. 1. Cancel streaming job. 1. Cleanup PubSub topics and subscriptions. Note: If other two improvement PRs ([PR-4921](https://github.com/apache/beam/pull/4921), [PR-4922](https://github.com/apache/beam/pull/4922)) are merged before this one, rebase is required before merging. Follow this checklist to help us incorporate your contribution quickly and easily: - [x] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [x] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 83000) Time Spent: 4h 20m (was: 4h 10m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82606=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82606 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 21/Mar/18 01:58 Start Date: 21/Mar/18 01:58 Worklog Time Spent: 10m Work Description: aaltay closed pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index 12f73510873..7ef95d85f1a 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -36,7 +36,7 @@ def split_fn(lines): import re - return re.findall(r'[A-Za-z\']+', lines) + return re.findall(r'[A-Za-z0-9\']+', lines) def run(argv=None): diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py new file mode 100644 index 000..a95e5fa8f53 --- /dev/null +++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for the streaming wordcount example. + +Important: End-to-end test infrastructure for streaming pipeine in Python SDK +is in development and is not yet available for use. + +Currently, this test blocks until the job is manually terminated. +""" + +import logging +import unittest + +from hamcrest.core.core.allof import all_of +from nose.plugins.attrib import attr + +from apache_beam.examples import streaming_wordcount +from apache_beam.runners.runner import PipelineState +from apache_beam.testing import test_utils +from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.test_pipeline import TestPipeline + +INPUT_TOPIC = 'wc_topic_input' +OUTPUT_TOPIC = 'wc_topic_output' +INPUT_SUB = 'wc_subscription_input' +OUTPUT_SUB = 'wc_subscription_output' + +DEFAULT_INPUT_NUMBERS = 500 + + +class StreamingWordCountIT(unittest.TestCase): + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) + +# Set up PubSub environment. +from google.cloud import pubsub +self.pubsub_client = pubsub.Client( +project=self.test_pipeline.get_option('project')) +self.input_topic = self.pubsub_client.topic(INPUT_TOPIC) +self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC) +self.input_sub = self.input_topic.subscription(INPUT_SUB) +self.output_sub = self.output_topic.subscription(OUTPUT_SUB) + +self._cleanup_pubsub() + +self.input_topic.create() +self.output_topic.create() +test_utils.wait_for_topics_created([self.input_topic, self.output_topic]) +self.input_sub.create() +self.output_sub.create() + + def _inject_numbers(self, topic, num_messages): +"""Inject numbers as test data to PubSub.""" +logging.debug('Injecting %d numbers to topic %s', + num_messages, topic.full_name) +for n in range(num_messages): + topic.publish(str(n)) + + def _cleanup_pubsub(self): +test_utils.cleanup_subscriptions([self.input_sub, self.output_sub]) +test_utils.cleanup_topics([self.input_topic, self.output_topic]) + + def tearDown(self): +self._cleanup_pubsub() + + @attr('developing_test') + def test_streaming_wordcount_it(self): +# Set extra options to the pipeline for test purpose +pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)] +extra_opts = {'input_sub': self.input_sub.full_name, + 'output_topic': self.output_topic.full_name, + 'on_success_matcher': all_of(*pipeline_verifiers)} + +# Generate input data and inject to
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82480=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82480 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 21:00 Start Date: 20/Mar/18 21:00 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175920707 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() Review comment: can you explain more about what other verification we can do here? Probably good to do it here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82480) Time Spent: 4h (was: 3h 50m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82477=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82477 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 20:53 Start Date: 20/Mar/18 20:53 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175918622 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) return self.result + + def _is_in_terminate_state(self, job_state): Review comment: sg, will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82477) Time Spent: 3h 50m (was: 3h 40m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82456=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82456 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 20:10 Start Date: 20/Mar/18 20:10 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175906266 ## File path: sdks/python/apache_beam/testing/test_utils.py ## @@ -129,3 +131,47 @@ def delete_files(file_paths): raise RuntimeError('Clean up failed. Invalid file path: %s.' % file_paths) FileSystems.delete(file_paths) + + +def wait_for_subscriptions_created(subs, timeout=60): + """Wait for all PubSub subscriptions are created.""" + return _wait_until_all_exist(subs, timeout) + + +def wait_for_topics_created(topics, timeout=60): + """Wait for all PubSub topics are created.""" + return _wait_until_all_exist(topics, timeout) + + +def _wait_until_all_exist(components, timeout): + need_wait = set(components) + start_time = time.time() + while time.time() - start_time <= timeout: +for c in components: Review comment: OK. Maybe add a TODO to clean this up later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82456) Time Spent: 3h 40m (was: 3.5h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82453=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82453 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 20:06 Start Date: 20/Mar/18 20:06 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175905248 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) return self.result + + def _is_in_terminate_state(self, job_state): Review comment: It is fair to make it public. (We can also use it later to address this todo: https://github.com/apache/beam/blob/b5041e9ab5c5fbb4435edb6fdcab78bc3483e86c/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L989) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82453) Time Spent: 3.5h (was: 3h 20m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82450=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82450 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 20:04 Start Date: 20/Mar/18 20:04 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175904585 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) return self.result + + def _is_in_terminate_state(self, job_state): +return job_state in [ +PipelineState.STOPPED, PipelineState.DONE, +PipelineState.FAILED, PipelineState.CANCELLED, +PipelineState.UPDATED, PipelineState.DRAINED, +] + + def wait_until_running(self): Review comment: Sounds good. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82450) Time Spent: 3h 10m (was: 3h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82451=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82451 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 20:04 Start Date: 20/Mar/18 20:04 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175904723 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() Review comment: I understand this, but for verification I think we need more than that. It is fine as one step. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82451) Time Spent: 3h 20m (was: 3h 10m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82445 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 19:55 Start Date: 20/Mar/18 19:55 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175902287 ## File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py ## @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for the streaming wordcount example. + +Important: End-to-end test infrastructure for streaming pipeine in Python SDK +is in development and is not yet available for use. + +Currently, this test is blocked until manually terminate the pipeline job. Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82445) Time Spent: 2h 40m (was: 2.5h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82447=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82447 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 19:55 Start Date: 20/Mar/18 19:55 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175902341 ## File path: sdks/python/apache_beam/testing/test_utils.py ## @@ -129,3 +131,47 @@ def delete_files(file_paths): raise RuntimeError('Clean up failed. Invalid file path: %s.' % file_paths) FileSystems.delete(file_paths) + + +def wait_for_subscriptions_created(subs, timeout=60): + """Wait for all PubSub subscriptions are created.""" + return _wait_until_all_exist(subs, timeout) + + +def wait_for_topics_created(topics, timeout=60): + """Wait for all PubSub topics are created.""" + return _wait_until_all_exist(topics, timeout) + + +def _wait_until_all_exist(components, timeout): + need_wait = set(components) + start_time = time.time() + while time.time() - start_time <= timeout: +for c in components: + if c in need_wait and c.exists(): +need_wait.remove(c) +if len(need_wait) == 0: + return True +time.sleep(2) + + raise RuntimeError('Timeout after %d seconds. %d of %d topics/subscriptions ' Review comment: sure. done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82447) Time Spent: 3h (was: 2h 50m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82446=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82446 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 19:55 Start Date: 20/Mar/18 19:55 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175902299 ## File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py ## @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for the streaming wordcount example. + +Important: End-to-end test infrastructure for streaming pipeine in Python SDK +is in development and is not yet available for use. + +Currently, this test is blocked until manually terminate the pipeline job. +""" + +import logging +import unittest + +from hamcrest.core.core.allof import all_of +from nose.plugins.attrib import attr + +from apache_beam.examples import streaming_wordcount +from apache_beam.runners.runner import PipelineState +from apache_beam.testing import test_utils +from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.test_pipeline import TestPipeline + +INPUT_TOPIC = 'wc_topic_input' +OUTPUT_TOPIC = 'wc_topic_output' +INPUT_SUB = 'wc_subscription_input' +OUTPUT_SUB = 'wc_subscription_output' + +DEFAULT_INPUT_NUMBERS = 500 + + +class StreamingWordCountIT(unittest.TestCase): + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) + +# Set up PubSub environment. +from google.cloud import pubsub +self.pubsub_client = pubsub.Client( +project=self.test_pipeline.get_option('project')) +self.input_topic = self.pubsub_client.topic(INPUT_TOPIC) +self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC) +self.input_sub = self.input_topic.subscription(INPUT_SUB) +self.output_sub = self.output_topic.subscription(OUTPUT_SUB) + +self._cleanup_pubsub() + +self.input_topic.create() +self.output_topic.create() +test_utils.wait_for_topics_created([self.input_topic, self.output_topic]) +self.input_sub.create() +self.output_sub.create() + + def _inject_numbers(self, topic, num_messages): +"""Inject numbers as test data to PubSub.""" +logging.debug('Injecting %d numbers to topic %s', + num_messages, topic.full_name) +for n in range(num_messages): + topic.publish(str(n)) + + def _cleanup_pubsub(self): +test_utils.cleanup_subscription([self.input_sub, self.output_sub]) Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82446) Time Spent: 2h 50m (was: 2h 40m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82435=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82435 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 19:42 Start Date: 20/Mar/18 19:42 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175896233 ## File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py ## @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for the streaming wordcount example. + +Important: End-to-end test infrastructure for streaming pipeine in Python SDK +is in development and is not yet available for use. + +Currently, this test is blocked until manually terminate the pipeline job. +""" + +import logging +import unittest + +from hamcrest.core.core.allof import all_of +from nose.plugins.attrib import attr + +from apache_beam.examples import streaming_wordcount +from apache_beam.runners.runner import PipelineState +from apache_beam.testing import test_utils +from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.test_pipeline import TestPipeline + +INPUT_TOPIC = 'wc_topic_input' +OUTPUT_TOPIC = 'wc_topic_output' +INPUT_SUB = 'wc_subscription_input' +OUTPUT_SUB = 'wc_subscription_output' + +DEFAULT_INPUT_NUMBERS = 500 + + +class StreamingWordCountIT(unittest.TestCase): + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) + +# Set up PubSub environment. +from google.cloud import pubsub +self.pubsub_client = pubsub.Client( +project=self.test_pipeline.get_option('project')) +self.input_topic = self.pubsub_client.topic(INPUT_TOPIC) +self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC) +self.input_sub = self.input_topic.subscription(INPUT_SUB) +self.output_sub = self.output_topic.subscription(OUTPUT_SUB) + +self._cleanup_pubsub() + +self.input_topic.create() +self.output_topic.create() +test_utils.wait_for_topics_created([self.input_topic, self.output_topic]) +self.input_sub.create() +self.output_sub.create() + + def _inject_numbers(self, topic, num_messages): +"""Inject numbers as test data to PubSub.""" +logging.debug('Injecting %d numbers to topic %s', + num_messages, topic.full_name) +for n in range(num_messages): + topic.publish(str(n)) + + def _cleanup_pubsub(self): +test_utils.cleanup_subscription([self.input_sub, self.output_sub]) +test_utils.cleanup_topics([self.input_topic, self.output_topic]) + + def tearDown(self): +self._cleanup_pubsub() + + @attr('developing_test') + def test_streaming_wordcount_it(self): +# Set extra options to the pipeline for test purpose +pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)] +extra_opts = {'input_sub': self.input_sub.full_name, + 'output_topic': self.output_topic.full_name, + 'on_success_matcher': all_of(*pipeline_verifiers)} + +# Generate input data and inject to PubSub. +test_utils.wait_for_subscriptions_created([self.input_sub]) Review comment: Actually I want to wait and verify the subscription exists before injecting data to PubSub in following line. From the pubsub API, there seems no guarantee that subscription is created successfully or instantly after calling `subscription.create()`. If subscription is created after data injection starts, data may be missing and verification will not be correct. So I add check here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id:
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82431=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82431 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 19:39 Start Date: 20/Mar/18 19:39 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175897788 ## File path: sdks/python/apache_beam/testing/test_utils.py ## @@ -129,3 +131,47 @@ def delete_files(file_paths): raise RuntimeError('Clean up failed. Invalid file path: %s.' % file_paths) FileSystems.delete(file_paths) + + +def wait_for_subscriptions_created(subs, timeout=60): + """Wait for all PubSub subscriptions are created.""" + return _wait_until_all_exist(subs, timeout) + + +def wait_for_topics_created(topics, timeout=60): + """Wait for all PubSub topics are created.""" + return _wait_until_all_exist(topics, timeout) + + +def _wait_until_all_exist(components, timeout): + need_wait = set(components) + start_time = time.time() + while time.time() - start_time <= timeout: +for c in components: Review comment: If I replace `components` with `needs_wait` in line 150, I'll get: RuntimeError: Set changed size during iteration. guess we cannot modify set during iteration. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82431) Time Spent: 2h 20m (was: 2h 10m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82428=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82428 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 19:34 Start Date: 20/Mar/18 19:34 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175896233 ## File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py ## @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for the streaming wordcount example. + +Important: End-to-end test infrastructure for streaming pipeine in Python SDK +is in development and is not yet available for use. + +Currently, this test is blocked until manually terminate the pipeline job. +""" + +import logging +import unittest + +from hamcrest.core.core.allof import all_of +from nose.plugins.attrib import attr + +from apache_beam.examples import streaming_wordcount +from apache_beam.runners.runner import PipelineState +from apache_beam.testing import test_utils +from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.test_pipeline import TestPipeline + +INPUT_TOPIC = 'wc_topic_input' +OUTPUT_TOPIC = 'wc_topic_output' +INPUT_SUB = 'wc_subscription_input' +OUTPUT_SUB = 'wc_subscription_output' + +DEFAULT_INPUT_NUMBERS = 500 + + +class StreamingWordCountIT(unittest.TestCase): + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) + +# Set up PubSub environment. +from google.cloud import pubsub +self.pubsub_client = pubsub.Client( +project=self.test_pipeline.get_option('project')) +self.input_topic = self.pubsub_client.topic(INPUT_TOPIC) +self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC) +self.input_sub = self.input_topic.subscription(INPUT_SUB) +self.output_sub = self.output_topic.subscription(OUTPUT_SUB) + +self._cleanup_pubsub() + +self.input_topic.create() +self.output_topic.create() +test_utils.wait_for_topics_created([self.input_topic, self.output_topic]) +self.input_sub.create() +self.output_sub.create() + + def _inject_numbers(self, topic, num_messages): +"""Inject numbers as test data to PubSub.""" +logging.debug('Injecting %d numbers to topic %s', + num_messages, topic.full_name) +for n in range(num_messages): + topic.publish(str(n)) + + def _cleanup_pubsub(self): +test_utils.cleanup_subscription([self.input_sub, self.output_sub]) +test_utils.cleanup_topics([self.input_topic, self.output_topic]) + + def tearDown(self): +self._cleanup_pubsub() + + @attr('developing_test') + def test_streaming_wordcount_it(self): +# Set extra options to the pipeline for test purpose +pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)] +extra_opts = {'input_sub': self.input_sub.full_name, + 'output_topic': self.output_topic.full_name, + 'on_success_matcher': all_of(*pipeline_verifiers)} + +# Generate input data and inject to PubSub. +test_utils.wait_for_subscriptions_created([self.input_sub]) Review comment: Actually I want to wait and verify the subscription exists before injecting data to PubSub in following line. From the pubsub API, there seems no guarantee that subscription is created successfully or instantly after calling `subscription.create()`. If subscription is created after data injection starts, data may be missing and verification will not be correct. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82428)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82409=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82409 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 18:53 Start Date: 20/Mar/18 18:53 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175884963 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() Review comment: I feel this is one preferred step of streaming test before executing verifiers. In integration test class, people can use `PipelineStateMatcher` to get a clear signal that the pipeline entered in RUNNING state (like [here](https://github.com/apache/beam/pull/4874/files#diff-a7940d9c8528322f7e20488c31ee47d3R85)). So they are confident to define/add following actions (like customized verifiers). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82409) Time Spent: 2h (was: 1h 50m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82402 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 18:43 Start Date: 20/Mar/18 18:43 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175881878 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) return self.result + + def _is_in_terminate_state(self, job_state): +return job_state in [ +PipelineState.STOPPED, PipelineState.DONE, +PipelineState.FAILED, PipelineState.CANCELLED, +PipelineState.UPDATED, PipelineState.DRAINED, +] + + def wait_until_running(self): Review comment: That's exactly what I did in the following [PR](https://github.com/markflyhigh/incubator-beam/pull/2/files#diff-e48ff65b992d87457791341246d796f9R83). How about we generalize it in the future? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82402) Time Spent: 1h 50m (was: 1h 40m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82397=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82397 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 20/Mar/18 18:36 Start Date: 20/Mar/18 18:36 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175879745 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) return self.result + + def _is_in_terminate_state(self, job_state): Review comment: sure. The reason I didn't use it is that `_is_in_terminal_state` is a private function. Should we make it public (rename to `is_in_terminal_state`)? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82397) Time Spent: 1h 40m (was: 1.5h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82091=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82091 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 19/Mar/18 23:27 Start Date: 19/Mar/18 23:27 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175616259 ## File path: sdks/python/apache_beam/testing/test_utils.py ## @@ -129,3 +131,47 @@ def delete_files(file_paths): raise RuntimeError('Clean up failed. Invalid file path: %s.' % file_paths) FileSystems.delete(file_paths) + + +def wait_for_subscriptions_created(subs, timeout=60): + """Wait for all PubSub subscriptions are created.""" + return _wait_until_all_exist(subs, timeout) + + +def wait_for_topics_created(topics, timeout=60): + """Wait for all PubSub topics are created.""" + return _wait_until_all_exist(topics, timeout) + + +def _wait_until_all_exist(components, timeout): + need_wait = set(components) + start_time = time.time() + while time.time() - start_time <= timeout: +for c in components: Review comment: I think you can do `for c in needs_wait` instead to iterate over a shorter list and remove `c in need_wait` below. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82091) Time Spent: 1h 20m (was: 1h 10m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82085=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82085 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 19/Mar/18 23:27 Start Date: 19/Mar/18 23:27 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175615483 ## File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py ## @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for the streaming wordcount example. + +Important: End-to-end test infrastructure for streaming pipeine in Python SDK +is in development and is not yet available for use. + +Currently, this test is blocked until manually terminate the pipeline job. Review comment: How about: Currently, this test blocks until the job is manually terminated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82085) Time Spent: 0.5h (was: 20m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82088=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82088 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 19/Mar/18 23:27 Start Date: 19/Mar/18 23:27 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175616557 ## File path: sdks/python/apache_beam/testing/test_utils.py ## @@ -129,3 +131,47 @@ def delete_files(file_paths): raise RuntimeError('Clean up failed. Invalid file path: %s.' % file_paths) FileSystems.delete(file_paths) + + +def wait_for_subscriptions_created(subs, timeout=60): + """Wait for all PubSub subscriptions are created.""" + return _wait_until_all_exist(subs, timeout) + + +def wait_for_topics_created(topics, timeout=60): + """Wait for all PubSub topics are created.""" + return _wait_until_all_exist(topics, timeout) + + +def _wait_until_all_exist(components, timeout): + need_wait = set(components) + start_time = time.time() + while time.time() - start_time <= timeout: +for c in components: + if c in need_wait and c.exists(): +need_wait.remove(c) +if len(need_wait) == 0: + return True +time.sleep(2) + + raise RuntimeError('Timeout after %d seconds. %d of %d topics/subscriptions ' Review comment: Would it be helpful if you log what does not exist? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82088) Time Spent: 50m (was: 40m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82087=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82087 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 19/Mar/18 23:27 Start Date: 19/Mar/18 23:27 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175590831 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) return self.result + + def _is_in_terminate_state(self, job_state): Review comment: Name this `_is_in_terminal_state`. Can you use `_is_in_terminal_state` from `DataflowPipelineResult`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82087) Time Spent: 40m (was: 0.5h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82089=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82089 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 19/Mar/18 23:27 Start Date: 19/Mar/18 23:27 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175615705 ## File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py ## @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for the streaming wordcount example. + +Important: End-to-end test infrastructure for streaming pipeine in Python SDK +is in development and is not yet available for use. + +Currently, this test is blocked until manually terminate the pipeline job. +""" + +import logging +import unittest + +from hamcrest.core.core.allof import all_of +from nose.plugins.attrib import attr + +from apache_beam.examples import streaming_wordcount +from apache_beam.runners.runner import PipelineState +from apache_beam.testing import test_utils +from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.test_pipeline import TestPipeline + +INPUT_TOPIC = 'wc_topic_input' +OUTPUT_TOPIC = 'wc_topic_output' +INPUT_SUB = 'wc_subscription_input' +OUTPUT_SUB = 'wc_subscription_output' + +DEFAULT_INPUT_NUMBERS = 500 + + +class StreamingWordCountIT(unittest.TestCase): + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) + +# Set up PubSub environment. +from google.cloud import pubsub +self.pubsub_client = pubsub.Client( +project=self.test_pipeline.get_option('project')) +self.input_topic = self.pubsub_client.topic(INPUT_TOPIC) +self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC) +self.input_sub = self.input_topic.subscription(INPUT_SUB) +self.output_sub = self.output_topic.subscription(OUTPUT_SUB) + +self._cleanup_pubsub() + +self.input_topic.create() +self.output_topic.create() +test_utils.wait_for_topics_created([self.input_topic, self.output_topic]) +self.input_sub.create() +self.output_sub.create() + + def _inject_numbers(self, topic, num_messages): +"""Inject numbers as test data to PubSub.""" +logging.debug('Injecting %d numbers to topic %s', + num_messages, topic.full_name) +for n in range(num_messages): + topic.publish(str(n)) + + def _cleanup_pubsub(self): +test_utils.cleanup_subscription([self.input_sub, self.output_sub]) Review comment: Rename to `cleanup_subscriptions`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82089) Time Spent: 1h (was: 50m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82090=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82090 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 19/Mar/18 23:27 Start Date: 19/Mar/18 23:27 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175591205 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) return self.result + + def _is_in_terminate_state(self, job_state): +return job_state in [ +PipelineState.STOPPED, PipelineState.DONE, +PipelineState.FAILED, PipelineState.CANCELLED, +PipelineState.UPDATED, PipelineState.DRAINED, +] + + def wait_until_running(self): Review comment: Maybe you can generalize this as wait until state with timeout. (Pass desired state and timeout as arguments.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82090) Time Spent: 1h 10m (was: 1h) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82092=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82092 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 19/Mar/18 23:27 Start Date: 19/Mar/18 23:27 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175615878 ## File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py ## @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for the streaming wordcount example. + +Important: End-to-end test infrastructure for streaming pipeine in Python SDK +is in development and is not yet available for use. + +Currently, this test is blocked until manually terminate the pipeline job. +""" + +import logging +import unittest + +from hamcrest.core.core.allof import all_of +from nose.plugins.attrib import attr + +from apache_beam.examples import streaming_wordcount +from apache_beam.runners.runner import PipelineState +from apache_beam.testing import test_utils +from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.test_pipeline import TestPipeline + +INPUT_TOPIC = 'wc_topic_input' +OUTPUT_TOPIC = 'wc_topic_output' +INPUT_SUB = 'wc_subscription_input' +OUTPUT_SUB = 'wc_subscription_output' + +DEFAULT_INPUT_NUMBERS = 500 + + +class StreamingWordCountIT(unittest.TestCase): + + def setUp(self): +self.test_pipeline = TestPipeline(is_integration_test=True) + +# Set up PubSub environment. +from google.cloud import pubsub +self.pubsub_client = pubsub.Client( +project=self.test_pipeline.get_option('project')) +self.input_topic = self.pubsub_client.topic(INPUT_TOPIC) +self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC) +self.input_sub = self.input_topic.subscription(INPUT_SUB) +self.output_sub = self.output_topic.subscription(OUTPUT_SUB) + +self._cleanup_pubsub() + +self.input_topic.create() +self.output_topic.create() +test_utils.wait_for_topics_created([self.input_topic, self.output_topic]) +self.input_sub.create() +self.output_sub.create() + + def _inject_numbers(self, topic, num_messages): +"""Inject numbers as test data to PubSub.""" +logging.debug('Injecting %d numbers to topic %s', + num_messages, topic.full_name) +for n in range(num_messages): + topic.publish(str(n)) + + def _cleanup_pubsub(self): +test_utils.cleanup_subscription([self.input_sub, self.output_sub]) +test_utils.cleanup_topics([self.input_topic, self.output_topic]) + + def tearDown(self): +self._cleanup_pubsub() + + @attr('developing_test') + def test_streaming_wordcount_it(self): +# Set extra options to the pipeline for test purpose +pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)] +extra_opts = {'input_sub': self.input_sub.full_name, + 'output_topic': self.output_topic.full_name, + 'on_success_matcher': all_of(*pipeline_verifiers)} + +# Generate input data and inject to PubSub. +test_utils.wait_for_subscriptions_created([self.input_sub]) Review comment: You already created subscriptions in `setUp`. Do you need to check it here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82092) Time Spent: 1.5h (was: 1h 20m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam >
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82086=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82086 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 19/Mar/18 23:27 Start Date: 19/Mar/18 23:27 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#discussion_r175591382 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline): print ( 'Found: https://console.cloud.google.com/dataflow/jobsDetail' '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) -self.result.wait_until_finish() + +if not options.view_as(StandardOptions).streaming: + self.result.wait_until_finish() +else: + # TODO: Ideally, we want to wait until workers start successfully. + self.wait_until_running() Review comment: What is the point of waiting until running? (i.e. What are we verifying by doing this?) Is this going to be addressed in the follow up PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82086) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=81389=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81389 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 16/Mar/18 21:45 Start Date: 16/Mar/18 21:45 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#issuecomment-373853545 Rebased against master. It's ready to review. Please take a look @aaltay This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81389) Time Spent: 20m (was: 10m) > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=81232=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81232 ] ASF GitHub Bot logged work on BEAM-3861: Author: ASF GitHub Bot Created on: 16/Mar/18 17:15 Start Date: 16/Mar/18 17:15 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test URL: https://github.com/apache/beam/pull/4874#issuecomment-373783156 +cc: @aaltay This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81232) Time Spent: 10m Remaining Estimate: 0h > Build test infra for end-to-end streaming test in Python SDK > > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)