[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming
[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=93005&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93005 ] ASF GitHub Bot logged work on BEAM-4093: Author: ASF GitHub Bot Created on: 20/Apr/18 01:10 Start Date: 20/Apr/18 01:10 Worklog Time Spent: 10m Work Description: aaltay closed pull request #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py index d0b53f50d79..5db1878f34f 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py @@ -37,6 +37,7 @@ OUTPUT_SUB = 'wc_subscription_output' DEFAULT_INPUT_NUMBERS = 500 +WAIT_UNTIL_FINISH_DURATION = 3 * 60 * 1000 # in milliseconds class StreamingWordCountIT(unittest.TestCase): @@ -87,6 +88,7 @@ def test_streaming_wordcount_it(self): timeout=400) extra_opts = {'input_subscription': self.input_sub.full_name, 'output_topic': self.output_topic.full_name, + 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION, 'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)} diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 7a2cd4bf1e4..b5f9d77617d 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -649,6 +649,13 @@ def _add_argparse_args(cls, parser): default=False, help=('Used in unit testing runners without submitting the ' 'actual job.')) +parser.add_argument( +'--wait_until_finish_duration', +default=None, +type=int, +help='The time to wait (in milliseconds) for test pipeline to finish. ' + 'If it is set to None, it will wait indefinitely until the job ' + 'is finished.') def validate(self, validator): errors = [] diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index 765ed245785..eedfa60f9fd 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -18,6 +18,7 @@ """Wrapper of Beam runners that's built for running and verifying e2e tests.""" from __future__ import print_function +import logging import time from apache_beam.internal import pickler @@ -37,6 +38,8 @@ def run_pipeline(self, pipeline): """Execute test pipeline and verify test matcher""" options = pipeline._options.view_as(TestOptions) on_success_matcher = options.on_success_matcher +wait_duration = options.wait_until_finish_duration +is_streaming = options.view_as(StandardOptions).streaming # [BEAM-1889] Do not send this to remote workers also, there is no need to # send this option to remote executors. @@ -49,10 +52,11 @@ def run_pipeline(self, pipeline): print('Found: %s.' % self.build_console_url(pipeline.options)) try: - if not options.view_as(StandardOptions).streaming: -self.result.wait_until_finish() - else: -self.wait_until_in_state(PipelineState.RUNNING) + self.wait_until_in_state(PipelineState.RUNNING) + + if is_streaming and not wait_duration: +logging.warning('Waiting indefinitely for streaming job.') + self.result.wait_until_finish(duration=wait_duration) if on_success_matcher: from hamcrest import assert_that as hc_assert_that @@ -60,7 +64,6 @@ def run_pipeline(self, pipeline): finally: if not self.result.is_in_terminal_state(): self.result.cancel() - if options.view_as(StandardOptions).streaming: self.wait_until_in_state(PipelineState.CANCELLED, timeout=300) return self.result diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py index 155190c09a7..0525945f15f 100644 --- a/sdks/python/apache_beam/testing/test_pipeline.py +++ b/sdks/python/apache_beam/testing/test_pipeline.py @@ -102,7 +102,8 @@ def run(self, test_runner_api=True): result = super(TestPipeline, self).run(test_runner_api) if self.blocking: state = result.wait_un
[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming
[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=92850&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92850 ] ASF GitHub Bot logged work on BEAM-4093: Author: ASF GitHub Bot Created on: 19/Apr/18 21:18 Start Date: 19/Apr/18 21:18 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147#issuecomment-382883690 @aaltay not sure why there are two pythonPreCommit. One passed without test failure. Is it good to merge? Also I manually tested integration test against Dataflow. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92850) Time Spent: 1.5h (was: 1h 20m) > Support Python ValidatesRunner test against TestDataflowRunner in streaming > --- > > Key: BEAM-4093 > URL: https://issues.apache.org/jira/browse/BEAM-4093 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core, testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming
[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=92849&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92849 ] ASF GitHub Bot logged work on BEAM-4093: Author: ASF GitHub Bot Created on: 19/Apr/18 21:17 Start Date: 19/Apr/18 21:17 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147#issuecomment-382883690 @aaltay not sure why there are two pythonPreCommit. One passed without test failure. Is it good to merge? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92849) Time Spent: 1h 20m (was: 1h 10m) > Support Python ValidatesRunner test against TestDataflowRunner in streaming > --- > > Key: BEAM-4093 > URL: https://issues.apache.org/jira/browse/BEAM-4093 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core, testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming
[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=92308&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92308 ] ASF GitHub Bot logged work on BEAM-4093: Author: ASF GitHub Bot Created on: 18/Apr/18 23:58 Start Date: 18/Apr/18 23:58 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147#issuecomment-382565690 I don't think they are related but I didn't see those failure in PostCommit. I'll rebase the branch and run test again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92308) Time Spent: 1h 10m (was: 1h) > Support Python ValidatesRunner test against TestDataflowRunner in streaming > --- > > Key: BEAM-4093 > URL: https://issues.apache.org/jira/browse/BEAM-4093 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core, testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming
[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=92173&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92173 ] ASF GitHub Bot logged work on BEAM-4093: Author: ASF GitHub Bot Created on: 18/Apr/18 17:15 Start Date: 18/Apr/18 17:15 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147#discussion_r182503739 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -37,10 +38,13 @@ def run_pipeline(self, pipeline): """Execute test pipeline and verify test matcher""" options = pipeline._options.view_as(TestOptions) on_success_matcher = options.on_success_matcher +wait_duration = options.wait_until_finish_duration +is_streaming = options.view_as(StandardOptions).streaming # [BEAM-1889] Do not send this to remote workers also, there is no need to # send this option to remote executors. options.on_success_matcher = None +options.wait_until_finish_duration = None Review comment: understand, will remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92173) Time Spent: 1h (was: 50m) > Support Python ValidatesRunner test against TestDataflowRunner in streaming > --- > > Key: BEAM-4093 > URL: https://issues.apache.org/jira/browse/BEAM-4093 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core, testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming
[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=92172&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92172 ] ASF GitHub Bot logged work on BEAM-4093: Author: ASF GitHub Bot Created on: 18/Apr/18 17:14 Start Date: 18/Apr/18 17:14 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147#discussion_r182503406 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -37,10 +38,13 @@ def run_pipeline(self, pipeline): """Execute test pipeline and verify test matcher""" options = pipeline._options.view_as(TestOptions) on_success_matcher = options.on_success_matcher +wait_duration = options.wait_until_finish_duration +is_streaming = options.view_as(StandardOptions).streaming # [BEAM-1889] Do not send this to remote workers also, there is no need to # send this option to remote executors. options.on_success_matcher = None +options.wait_until_finish_duration = None Review comment: It is ok to send it. The problem with `on_success_matcher` was it contains an `=` character and we found out that the workers had problem with parsing it out of the options. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92172) Time Spent: 50m (was: 40m) > Support Python ValidatesRunner test against TestDataflowRunner in streaming > --- > > Key: BEAM-4093 > URL: https://issues.apache.org/jira/browse/BEAM-4093 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core, testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming
[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=92162&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92162 ] ASF GitHub Bot logged work on BEAM-4093: Author: ASF GitHub Bot Created on: 18/Apr/18 16:47 Start Date: 18/Apr/18 16:47 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147#discussion_r182495580 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -37,10 +38,13 @@ def run_pipeline(self, pipeline): """Execute test pipeline and verify test matcher""" options = pipeline._options.view_as(TestOptions) on_success_matcher = options.on_success_matcher +wait_duration = options.wait_until_finish_duration +is_streaming = options.view_as(StandardOptions).streaming # [BEAM-1889] Do not send this to remote workers also, there is no need to # send this option to remote executors. options.on_success_matcher = None +options.wait_until_finish_duration = None Review comment: The reason I add it here because the comment in line 45. If it's okay to send this option to remote executors, I prefer to remove it to keep code simple. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92162) Time Spent: 40m (was: 0.5h) > Support Python ValidatesRunner test against TestDataflowRunner in streaming > --- > > Key: BEAM-4093 > URL: https://issues.apache.org/jira/browse/BEAM-4093 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core, testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming
[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=92027&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92027 ] ASF GitHub Bot logged work on BEAM-4093: Author: ASF GitHub Bot Created on: 18/Apr/18 02:00 Start Date: 18/Apr/18 02:00 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147#discussion_r182290272 ## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ## @@ -37,10 +38,13 @@ def run_pipeline(self, pipeline): """Execute test pipeline and verify test matcher""" options = pipeline._options.view_as(TestOptions) on_success_matcher = options.on_success_matcher +wait_duration = options.wait_until_finish_duration +is_streaming = options.view_as(StandardOptions).streaming # [BEAM-1889] Do not send this to remote workers also, there is no need to # send this option to remote executors. options.on_success_matcher = None +options.wait_until_finish_duration = None Review comment: Do you need this here? I do not think this this is necessary based on BEAM-1889? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92027) Time Spent: 0.5h (was: 20m) > Support Python ValidatesRunner test against TestDataflowRunner in streaming > --- > > Key: BEAM-4093 > URL: https://issues.apache.org/jira/browse/BEAM-4093 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core, testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming
[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=91816&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91816 ] ASF GitHub Bot logged work on BEAM-4093: Author: ASF GitHub Bot Created on: 17/Apr/18 16:45 Start Date: 17/Apr/18 16:45 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147#issuecomment-382062999 +R: @aaltay This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 91816) Time Spent: 20m (was: 10m) > Support Python ValidatesRunner test against TestDataflowRunner in streaming > --- > > Key: BEAM-4093 > URL: https://issues.apache.org/jira/browse/BEAM-4093 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core, testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming
[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=91545&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91545 ] ASF GitHub Bot logged work on BEAM-4093: Author: ASF GitHub Bot Created on: 16/Apr/18 23:21 Start Date: 16/Apr/18 23:21 Worklog Time Spent: 10m Work Description: markflyhigh opened a new pull request #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147 Improved `TestDataflowRunner` so that ValidatesRunner tests can run in streaming mode against it by specify `--wait_until_finish_duration` from commandline. Since streaming pipeline cannot terminate itself, framework will cancel the job after `wait_until_finish_duration` time. Note: This change only enables basic level of verification to the streaming job. Any failures that lead to unsuccessful termination state (like FAILED) will be caught and cause test fail. However, `asser_that` failures cannot be caught at this time. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 91545) Time Spent: 10m Remaining Estimate: 0h > Support Python ValidatesRunner test against TestDataflowRunner in streaming > --- > > Key: BEAM-4093 > URL: https://issues.apache.org/jira/browse/BEAM-4093 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core, testing >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)