[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83703=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83703
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 17:28
Start Date: 23/Mar/18 17:28
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4930: [BEAM-3861] 
Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375741821
 
 
   Yes, I created a jira 
[BEAM-3922](https://issues.apache.org/jira/browse/BEAM-3922) to track 
PostCommit failure. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83703)
Time Spent: 8h 50m  (was: 8h 40m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83702=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83702
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 17:28
Start Date: 23/Mar/18 17:28
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4930: [BEAM-3861] 
Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375741821
 
 
   Yes, I create a jira 
[BEAM-3922](https://issues.apache.org/jira/browse/BEAM-3922) to track 
PostCommit failure. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83702)
Time Spent: 8h 40m  (was: 8.5h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83704=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83704
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 17:28
Start Date: 23/Mar/18 17:28
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4930: [BEAM-3861] 
Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375741821
 
 
   Yes, I created a jira 
[BEAM-3922](https://issues.apache.org/jira/browse/BEAM-3922) to track this 
PostCommit failure. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83704)
Time Spent: 9h  (was: 8h 50m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 9h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83692=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83692
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 17:22
Start Date: 23/Mar/18 17:22
Worklog Time Spent: 10m 
  Work Description: aaltay closed pull request #4930: [BEAM-3861] Complete 
streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py 
b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index a95e5fa8f53..04e6e4e7370 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -23,13 +23,17 @@
 Currently, this test blocks until the job is manually terminated.
 """
 
+import datetime
 import logging
+import random
 import unittest
+import uuid
 
 from hamcrest.core.core.allof import all_of
 from nose.plugins.attrib import attr
 
 from apache_beam.examples import streaming_wordcount
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
 from apache_beam.runners.runner import PipelineState
 from apache_beam.testing import test_utils
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
@@ -47,17 +51,16 @@ class StreamingWordCountIT(unittest.TestCase):
 
   def setUp(self):
 self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+self.uuid = str(uuid.uuid4())
 
 # Set up PubSub environment.
 from google.cloud import pubsub
-self.pubsub_client = pubsub.Client(
-project=self.test_pipeline.get_option('project'))
-self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
-self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
-self.input_sub = self.input_topic.subscription(INPUT_SUB)
-self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
-
-self._cleanup_pubsub()
+self.pubsub_client = pubsub.Client(project=self.project)
+self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
+self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
+self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
+self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
 
 self.input_topic.create()
 self.output_topic.create()
@@ -65,6 +68,11 @@ def setUp(self):
 self.input_sub.create()
 self.output_sub.create()
 
+  def _generate_identifier(self):
+seed = random.randint(0, 999)
+current_time = datetime.datetime.now().strftime('%m%d%H%M%S')
+return '%s%d' % (current_time, seed)
+
   def _inject_numbers(self, topic, num_messages):
 """Inject numbers as test data to PubSub."""
 logging.debug('Injecting %d numbers to topic %s',
@@ -79,13 +87,21 @@ def _cleanup_pubsub(self):
   def tearDown(self):
 self._cleanup_pubsub()
 
-  @attr('developing_test')
+  @attr('IT')
   def test_streaming_wordcount_it(self):
+# Build expected dataset.
+expected_msg = [('%d: 1' % num) for num in range(DEFAULT_INPUT_NUMBERS)]
+
 # Set extra options to the pipeline for test purpose
-pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)]
-extra_opts = {'input_sub': self.input_sub.full_name,
+state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
+pubsub_msg_verifier = PubSubMessageMatcher(self.project,
+   OUTPUT_SUB + self.uuid,
+   expected_msg,
+   timeout=400)
+extra_opts = {'input_subscription': self.input_sub.full_name,
   'output_topic': self.output_topic.full_name,
-  'on_success_matcher': all_of(*pipeline_verifiers)}
+  'on_success_matcher': all_of(state_verifier,
+   pubsub_msg_verifier)}
 
 # Generate input data and inject to PubSub.
 test_utils.wait_for_subscriptions_created([self.input_sub])
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py 
b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
new file mode 100644
index 000..8fb687908d8
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional 

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83691=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83691
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 17:21
Start Date: 23/Mar/18 17:21
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #4930: [BEAM-3861] Complete 
streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375739739
 
 
   Thank you @markflyhigh I will merge this.
   
   It looks like there is a bug related to recently introduced 
https://github.com/apache/beam/pull/4781 (I will file the bug.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83691)
Time Spent: 8h 20m  (was: 8h 10m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83628=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83628
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 15:54
Start Date: 23/Mar/18 15:54
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4930: [BEAM-3861] 
Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375712049
 
 
   From the [console 
log](https://builds.apache.org/job/beam_PostCommit_Python_Verify/4487/consoleFull):
   
   ```
   test_streaming_wordcount_it 
(apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT) ... ok
   test_wordcount_fnapi_it (apache_beam.examples.wordcount_it_test.WordCountIT) 
... ok
   test_bigquery_tornadoes_it 
(apache_beam.examples.cookbook.bigquery_tornadoes_it_test.BigqueryTornadoesIT) 
... ok
   test_wordcount_it (apache_beam.examples.wordcount_it_test.WordCountIT) ... 
ERROR
   ```
   `test_wordcount_it` failed with no attribute error, which cause Jenkins 
Postcommit failed since Mar 21st. Didn't have a chance to look into that, but 
it doesn't affected `test_streaming_wordcount_it`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83628)
Time Spent: 8h  (was: 7h 50m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 8h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83485=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83485
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 06:23
Start Date: 23/Mar/18 06:23
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #4930: [BEAM-3861] Complete 
streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375556990
 
 
   Do you know why post commit tests failed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83485)
Time Spent: 7h 50m  (was: 7h 40m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83467=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83467
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 05:12
Start Date: 23/Mar/18 05:12
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4930: [BEAM-3861] 
Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375547198
 
 
   Run Python PostCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83467)
Time Spent: 7h 40m  (was: 7.5h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83413=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83413
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 00:52
Start Date: 23/Mar/18 00:52
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #4930: [BEAM-3861] Complete 
streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375504394
 
 
   LGTM. Please squash, I can merge after tests pass.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83413)
Time Spent: 7.5h  (was: 7h 20m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83396=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83396
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 00:05
Start Date: 23/Mar/18 00:05
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4930: [BEAM-3861] 
Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375497207
 
 
   @aaltay PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83396)
Time Spent: 7h 20m  (was: 7h 10m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83395=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83395
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 00:03
Start Date: 23/Mar/18 00:03
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4930: [BEAM-3861] 
Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375496804
 
 
   Run Python PostCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83395)
Time Spent: 7h 10m  (was: 7h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83393=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83393
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 00:01
Start Date: 23/Mar/18 00:01
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176607403
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
+  raise ValueError('Invalid expected messages %s.' % expected_msg)
+
+self.project = project
+self.sub_name = sub_name
+self.expected_msg = expected_msg
+self.timeout = timeout
+self.messages = None
+
+  def _matches(self, _):
+if not self.messages:
+  subscription = (pubsub
+  .Client(project=self.project)
+  .subscription(self.sub_name))
+  self.messages = self._wait_for_messages(subscription,
 
 Review comment:
   done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83393)
Time Spent: 6h 50m  (was: 6h 40m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83394=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83394
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 00:01
Start Date: 23/Mar/18 00:01
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176607424
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
+  raise ValueError('Invalid expected messages %s.' % expected_msg)
+
+self.project = project
+self.sub_name = sub_name
+self.expected_msg = expected_msg
+self.timeout = timeout
+self.messages = None
+
+  def _matches(self, _):
+if not self.messages:
+  subscription = (pubsub
+  .Client(project=self.project)
+  .subscription(self.sub_name))
+  self.messages = self._wait_for_messages(subscription,
+  len(self.expected_msg),
+  self.timeout)
+return Counter(self.messages) == Counter(self.expected_msg)
+
+  def _wait_for_messages(self, subscription, expected_num, timeout):
+"""Wait for messages from given subscription."""
+logging.debug('Start pulling messages from %s', subscription.full_name)
+total_messages = []
+start_time = time.time()
+while time.time() - start_time <= timeout:
+  pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
+  for ack_id, message in pulled:
+total_messages.append(message.data)
+subscription.acknowledge([ack_id])
+  if len(total_messages) >= expected_num:
+return total_messages
+  time.sleep(1)
+
+raise RuntimeError('Timeout after %d sec. Received %d messages from %s.' %
+   (timeout, len(total_messages), subscription.full_name))
+
+  def describe_to(self, description):
+description.append_text(
+'Expected %d messages.' % len(self.expected_msg))
+
+  def describe_mismatch(self, _, mismatch_description):
+diff = set(self.expected_msg) - set(self.messages)
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use 

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83392=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83392
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 23/Mar/18 00:01
Start Date: 23/Mar/18 00:01
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176607385
 
 

 ##
 File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
 ##
 @@ -47,24 +50,32 @@ class StreamingWordCountIT(unittest.TestCase):
 
   def setUp(self):
 self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+self.identifier = self._generate_identifier()
 
 Review comment:
   done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83392)
Time Spent: 6h 40m  (was: 6.5h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83391=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83391
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 23:56
Start Date: 22/Mar/18 23:56
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4930: 
[BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176606739
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline):
 
 self.result = super(TestDataflowRunner, self).run_pipeline(pipeline)
 if self.result.has_job:
-  project = pipeline._options.view_as(GoogleCloudOptions).project
-  region_id = pipeline._options.view_as(GoogleCloudOptions).region
-  job_id = self.result.job_id()
   # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
   # in some cases.
-  print (
-  'Found: https://console.cloud.google.com/dataflow/jobsDetail'
-  '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
+  print('Found: %s.' % self.build_console_url(pipeline.options))
 
 if not options.view_as(StandardOptions).streaming:
   self.result.wait_until_finish()
 else:
-  # TODO: Ideally, we want to wait until workers start successfully.
-  self.wait_until_running()
+  self.wait_until_in_state(PipelineState.RUNNING)
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
+if options.view_as(StandardOptions).streaming:
+  self.result.cancel()
 
 Review comment:
   I missed the success matchers above. This is fine as it is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83391)
Time Spent: 6.5h  (was: 6h 20m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83338=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83338
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 21:04
Start Date: 22/Mar/18 21:04
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176571812
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
+  raise ValueError('Invalid expected messages %s.' % expected_msg)
+
+self.project = project
+self.sub_name = sub_name
+self.expected_msg = expected_msg
+self.timeout = timeout
+self.messages = None
+
+  def _matches(self, _):
+if not self.messages:
+  subscription = (pubsub
+  .Client(project=self.project)
+  .subscription(self.sub_name))
+  self.messages = self._wait_for_messages(subscription,
 
 Review comment:
   just realize this method returns received messages, will return instead of 
raise exception in all cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83338)
Time Spent: 6h 20m  (was: 6h 10m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83312=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83312
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 19:54
Start Date: 22/Mar/18 19:54
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4930: 
[BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176552990
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline):
 
 self.result = super(TestDataflowRunner, self).run_pipeline(pipeline)
 if self.result.has_job:
-  project = pipeline._options.view_as(GoogleCloudOptions).project
-  region_id = pipeline._options.view_as(GoogleCloudOptions).region
-  job_id = self.result.job_id()
   # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
   # in some cases.
-  print (
-  'Found: https://console.cloud.google.com/dataflow/jobsDetail'
-  '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
+  print('Found: %s.' % self.build_console_url(pipeline.options))
 
 if not options.view_as(StandardOptions).streaming:
   self.result.wait_until_finish()
 else:
-  # TODO: Ideally, we want to wait until workers start successfully.
-  self.wait_until_running()
+  self.wait_until_in_state(PipelineState.RUNNING)
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
+if options.view_as(StandardOptions).streaming:
+  self.result.cancel()
 
 Review comment:
   But if you can cancel, before you start the verification process, the 
pipeline may still not be able to process the data. 
   
   I think we can simply sleep a bit before calling cancel. (Also note that 
pipeline running state is reached before workers actually doing meaning work. 
They still need to install SDK etc.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83312)
Time Spent: 6h 10m  (was: 6h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83283=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83283
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 18:37
Start Date: 22/Mar/18 18:37
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176530410
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline):
 
 self.result = super(TestDataflowRunner, self).run_pipeline(pipeline)
 if self.result.has_job:
-  project = pipeline._options.view_as(GoogleCloudOptions).project
-  region_id = pipeline._options.view_as(GoogleCloudOptions).region
-  job_id = self.result.job_id()
   # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
   # in some cases.
-  print (
-  'Found: https://console.cloud.google.com/dataflow/jobsDetail'
-  '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
+  print('Found: %s.' % self.build_console_url(pipeline.options))
 
 if not options.view_as(StandardOptions).streaming:
   self.result.wait_until_finish()
 else:
-  # TODO: Ideally, we want to wait until workers start successfully.
-  self.wait_until_running()
+  self.wait_until_in_state(PipelineState.RUNNING)
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
+if options.view_as(StandardOptions).streaming:
+  self.result.cancel()
 
 Review comment:
   I see. A customized matcher is probably one approach for this. Like 
`PubSubMessageMatcher`, the verification is blocked until messaged pulled (or 
timeout). So wait logic inside `on_success_matcher` can give pipeline time to 
process the data. However, cancel will be called immediately after setup only 
if no `on_success_matcher` is provided.
   
   Do we want a waiting step here as a general step of the test?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83283)
Time Spent: 6h  (was: 5h 50m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83260=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83260
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 17:42
Start Date: 22/Mar/18 17:42
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176489484
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
+  raise ValueError('Invalid expected messages %s.' % expected_msg)
+
+self.project = project
+self.sub_name = sub_name
+self.expected_msg = expected_msg
+self.timeout = timeout
+self.messages = None
+
+  def _matches(self, _):
+if not self.messages:
+  subscription = (pubsub
+  .Client(project=self.project)
+  .subscription(self.sub_name))
+  self.messages = self._wait_for_messages(subscription,
+  len(self.expected_msg),
+  self.timeout)
+return Counter(self.messages) == Counter(self.expected_msg)
+
+  def _wait_for_messages(self, subscription, expected_num, timeout):
+"""Wait for messages from given subscription."""
+logging.debug('Start pulling messages from %s', subscription.full_name)
+total_messages = []
+start_time = time.time()
+while time.time() - start_time <= timeout:
+  pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
+  for ack_id, message in pulled:
+total_messages.append(message.data)
+subscription.acknowledge([ack_id])
+  if len(total_messages) >= expected_num:
 
 Review comment:
   Only supporting full output matching is what I'm intend to do. I found it's 
hard to know if more output will appear after all expected data are pulled. I 
try to avoid adding extra wait time after all expected data are pulled, which 
may inefficient in some cases. So in `_matches` line 82, I verify exactly match 
of two datasets.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83252=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83252
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 17:15
Start Date: 22/Mar/18 17:15
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4930: 
[BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176502035
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline):
 
 self.result = super(TestDataflowRunner, self).run_pipeline(pipeline)
 if self.result.has_job:
-  project = pipeline._options.view_as(GoogleCloudOptions).project
-  region_id = pipeline._options.view_as(GoogleCloudOptions).region
-  job_id = self.result.job_id()
   # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
   # in some cases.
-  print (
-  'Found: https://console.cloud.google.com/dataflow/jobsDetail'
-  '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
+  print('Found: %s.' % self.build_console_url(pipeline.options))
 
 if not options.view_as(StandardOptions).streaming:
   self.result.wait_until_finish()
 else:
-  # TODO: Ideally, we want to wait until workers start successfully.
-  self.wait_until_running()
+  self.wait_until_in_state(PipelineState.RUNNING)
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
+if options.view_as(StandardOptions).streaming:
+  self.result.cancel()
 
 Review comment:
   Because we want pipeline to work for a bit after the setup is completed. 
setup -> cancel does not really allow anytime for pipeline to process data.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83252)
Time Spent: 5h 40m  (was: 5.5h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83237=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83237
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 16:47
Start Date: 22/Mar/18 16:47
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176492223
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
+  raise ValueError('Invalid expected messages %s.' % expected_msg)
+
+self.project = project
+self.sub_name = sub_name
+self.expected_msg = expected_msg
+self.timeout = timeout
+self.messages = None
+
+  def _matches(self, _):
+if not self.messages:
+  subscription = (pubsub
+  .Client(project=self.project)
+  .subscription(self.sub_name))
+  self.messages = self._wait_for_messages(subscription,
 
 Review comment:
   Looks like return `False` is better considering the rest of matchers can be 
executed instead of break here, and pipeline will be canceled at the end of 
test. Will fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83237)
Time Spent: 5.5h  (was: 5h 20m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83227=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83227
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 16:41
Start Date: 22/Mar/18 16:41
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176489484
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
+  raise ValueError('Invalid expected messages %s.' % expected_msg)
+
+self.project = project
+self.sub_name = sub_name
+self.expected_msg = expected_msg
+self.timeout = timeout
+self.messages = None
+
+  def _matches(self, _):
+if not self.messages:
+  subscription = (pubsub
+  .Client(project=self.project)
+  .subscription(self.sub_name))
+  self.messages = self._wait_for_messages(subscription,
+  len(self.expected_msg),
+  self.timeout)
+return Counter(self.messages) == Counter(self.expected_msg)
+
+  def _wait_for_messages(self, subscription, expected_num, timeout):
+"""Wait for messages from given subscription."""
+logging.debug('Start pulling messages from %s', subscription.full_name)
+total_messages = []
+start_time = time.time()
+while time.time() - start_time <= timeout:
+  pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
+  for ack_id, message in pulled:
+total_messages.append(message.data)
+subscription.acknowledge([ack_id])
+  if len(total_messages) >= expected_num:
 
 Review comment:
   Only supporting full output matching is what I'm intend to do. I found it's 
hard to know if more output will appear after all expected data are pulled. I 
try to avoid adding extra wait time after all expected data are pulled, which 
may inefficient in some cases. So in `_matches` line 82, I verify exactly match 
of two dataset.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83228=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83228
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 16:41
Start Date: 22/Mar/18 16:41
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176490292
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline):
 
 self.result = super(TestDataflowRunner, self).run_pipeline(pipeline)
 if self.result.has_job:
-  project = pipeline._options.view_as(GoogleCloudOptions).project
-  region_id = pipeline._options.view_as(GoogleCloudOptions).region
-  job_id = self.result.job_id()
   # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
   # in some cases.
-  print (
-  'Found: https://console.cloud.google.com/dataflow/jobsDetail'
-  '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
+  print('Found: %s.' % self.build_console_url(pipeline.options))
 
 if not options.view_as(StandardOptions).streaming:
   self.result.wait_until_finish()
 else:
-  # TODO: Ideally, we want to wait until workers start successfully.
-  self.wait_until_running()
+  self.wait_until_in_state(PipelineState.RUNNING)
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
+if options.view_as(StandardOptions).streaming:
+  self.result.cancel()
 
 Review comment:
   sure. Can you explain more why we want to add a wait? My understanding is 
that it's better to call cancel after worker successfully started, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83228)
Time Spent: 5h 10m  (was: 5h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83229=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83229
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 16:41
Start Date: 22/Mar/18 16:41
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176489600
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
+  raise ValueError('Invalid expected messages %s.' % expected_msg)
+
+self.project = project
+self.sub_name = sub_name
+self.expected_msg = expected_msg
+self.timeout = timeout
+self.messages = None
+
+  def _matches(self, _):
+if not self.messages:
+  subscription = (pubsub
+  .Client(project=self.project)
+  .subscription(self.sub_name))
+  self.messages = self._wait_for_messages(subscription,
+  len(self.expected_msg),
+  self.timeout)
+return Counter(self.messages) == Counter(self.expected_msg)
+
+  def _wait_for_messages(self, subscription, expected_num, timeout):
+"""Wait for messages from given subscription."""
+logging.debug('Start pulling messages from %s', subscription.full_name)
+total_messages = []
+start_time = time.time()
+while time.time() - start_time <= timeout:
+  pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
+  for ack_id, message in pulled:
+total_messages.append(message.data)
+subscription.acknowledge([ack_id])
+  if len(total_messages) >= expected_num:
+return total_messages
+  time.sleep(1)
+
+raise RuntimeError('Timeout after %d sec. Received %d messages from %s.' %
+   (timeout, len(total_messages), subscription.full_name))
+
+  def describe_to(self, description):
+description.append_text(
+'Expected %d messages.' % len(self.expected_msg))
+
+  def describe_mismatch(self, _, mismatch_description):
+diff = set(self.expected_msg) - set(self.messages)
 
 Review comment:
   good point! will change.


This is an automated message from the Apache Git Service.
To respond to the message, please log 

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83058
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 04:19
Start Date: 22/Mar/18 04:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4930: 
[BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176304644
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
+  raise ValueError('Invalid expected messages %s.' % expected_msg)
+
+self.project = project
+self.sub_name = sub_name
+self.expected_msg = expected_msg
+self.timeout = timeout
+self.messages = None
+
+  def _matches(self, _):
+if not self.messages:
+  subscription = (pubsub
+  .Client(project=self.project)
+  .subscription(self.sub_name))
+  self.messages = self._wait_for_messages(subscription,
 
 Review comment:
   Should this return `False` in case of a timeout, instead of raising an 
exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83058)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83057
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 04:19
Start Date: 22/Mar/18 04:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4930: 
[BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176304742
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
+  raise ValueError('Invalid expected messages %s.' % expected_msg)
+
+self.project = project
+self.sub_name = sub_name
+self.expected_msg = expected_msg
+self.timeout = timeout
+self.messages = None
+
+  def _matches(self, _):
+if not self.messages:
+  subscription = (pubsub
+  .Client(project=self.project)
+  .subscription(self.sub_name))
+  self.messages = self._wait_for_messages(subscription,
+  len(self.expected_msg),
+  self.timeout)
+return Counter(self.messages) == Counter(self.expected_msg)
+
+  def _wait_for_messages(self, subscription, expected_num, timeout):
+"""Wait for messages from given subscription."""
+logging.debug('Start pulling messages from %s', subscription.full_name)
+total_messages = []
+start_time = time.time()
+while time.time() - start_time <= timeout:
+  pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
+  for ack_id, message in pulled:
+total_messages.append(message.data)
+subscription.acknowledge([ack_id])
+  if len(total_messages) >= expected_num:
 
 Review comment:
   This is a little strange. This matcher will be only useful when testing for 
the full output. It would not be useful for testing say elements x, y, z should 
be part of the whole output type of a use case.
   
   Is this what you intended to do?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83057)

> Build test 

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83060=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83060
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 04:19
Start Date: 22/Mar/18 04:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4930: 
[BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176304852
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
+  raise ValueError('Invalid expected messages %s.' % expected_msg)
+
+self.project = project
+self.sub_name = sub_name
+self.expected_msg = expected_msg
+self.timeout = timeout
+self.messages = None
+
+  def _matches(self, _):
+if not self.messages:
+  subscription = (pubsub
+  .Client(project=self.project)
+  .subscription(self.sub_name))
+  self.messages = self._wait_for_messages(subscription,
+  len(self.expected_msg),
+  self.timeout)
+return Counter(self.messages) == Counter(self.expected_msg)
+
+  def _wait_for_messages(self, subscription, expected_num, timeout):
+"""Wait for messages from given subscription."""
+logging.debug('Start pulling messages from %s', subscription.full_name)
+total_messages = []
+start_time = time.time()
+while time.time() - start_time <= timeout:
+  pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
+  for ack_id, message in pulled:
+total_messages.append(message.data)
+subscription.acknowledge([ack_id])
+  if len(total_messages) >= expected_num:
+return total_messages
+  time.sleep(1)
+
+raise RuntimeError('Timeout after %d sec. Received %d messages from %s.' %
+   (timeout, len(total_messages), subscription.full_name))
+
+  def describe_to(self, description):
+description.append_text(
+'Expected %d messages.' % len(self.expected_msg))
+
+  def describe_mismatch(self, _, mismatch_description):
+diff = set(self.expected_msg) - set(self.messages)
 
 Review comment:
   don't you want to use a counter here to? That is what you used for matching. 
(For example if you are checking for messages x, x and only x appears, it will 
mismatch (correctly), however 

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83056=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83056
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 04:19
Start Date: 22/Mar/18 04:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4930: 
[BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176304163
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
 ##
 @@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PubSub verifier used for end-to-end test."""
+
+import logging
+import time
+from collections import Counter
+
+from hamcrest.core.base_matcher import BaseMatcher
+
+__all__ = ['PubSubMessageMatcher']
+
+
+# Protect against environments where pubsub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+DEFAULT_TIMEOUT = 5 * 60
+MAX_MESSAGES_IN_ONE_PULL = 50
+
+
+class PubSubMessageMatcher(BaseMatcher):
+  """Matcher that verifies messages from given subscription.
+
+  This matcher can block the test and keep pulling messages from given
+  subscription until all expected messages are shown or timeout.
+  """
+
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+"""Initialize PubSubMessageMatcher object.
+
+Args:
+  project: A name string of project.
+  sub_name: A name string of subscription which is attached to output.
+  expected_msg: A string list that contains expected message data pulled
+from the subscription.
+  timeout: Timeout in seconds to wait for all expected messages appears.
+"""
+if pubsub is None:
+  raise ImportError(
+  'PubSub dependencies are not installed.')
+if not project:
+  raise ValueError('Invalid project %s.' % project)
+if not sub_name:
+  raise ValueError('Invalid subscription %s.' % sub_name)
+if not expected_msg or not isinstance(expected_msg, list):
 
 Review comment:
   `if not isinstance(expected_msg, list):` should be enough?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83056)
Time Spent: 5h  (was: 4h 50m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83055=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83055
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 04:19
Start Date: 22/Mar/18 04:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4930: 
[BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176305077
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline):
 
 self.result = super(TestDataflowRunner, self).run_pipeline(pipeline)
 if self.result.has_job:
-  project = pipeline._options.view_as(GoogleCloudOptions).project
-  region_id = pipeline._options.view_as(GoogleCloudOptions).region
-  job_id = self.result.job_id()
   # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
   # in some cases.
-  print (
-  'Found: https://console.cloud.google.com/dataflow/jobsDetail'
-  '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
+  print('Found: %s.' % self.build_console_url(pipeline.options))
 
 if not options.view_as(StandardOptions).streaming:
   self.result.wait_until_finish()
 else:
-  # TODO: Ideally, we want to wait until workers start successfully.
-  self.wait_until_running()
+  self.wait_until_in_state(PipelineState.RUNNING)
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
+if options.view_as(StandardOptions).streaming:
+  self.result.cancel()
 
 Review comment:
   Do you want to cancel the job immediately after it is reaching to running 
state? Should we wait a bit first?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83055)
Time Spent: 4h 50m  (was: 4h 40m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83059=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83059
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 04:19
Start Date: 22/Mar/18 04:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4930: 
[BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176303844
 
 

 ##
 File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
 ##
 @@ -47,24 +50,32 @@ class StreamingWordCountIT(unittest.TestCase):
 
   def setUp(self):
 self.test_pipeline = TestPipeline(is_integration_test=True)
+self.project = self.test_pipeline.get_option('project')
+self.identifier = self._generate_identifier()
 
 Review comment:
   You could use uuid instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83059)
Time Spent: 5h  (was: 4h 50m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83001=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83001
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 00:33
Start Date: 22/Mar/18 00:33
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4930: [BEAM-3861] 
Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375140129
 
 
   +R: @aaltay PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83001)
Time Spent: 4.5h  (was: 4h 20m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83002=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83002
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 00:33
Start Date: 22/Mar/18 00:33
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4930: [BEAM-3861] 
Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#issuecomment-375140129
 
 
   +R: @aaltay Ready to review. PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83002)
Time Spent: 4h 40m  (was: 4.5h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83000=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83000
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 22/Mar/18 00:30
Start Date: 22/Mar/18 00:30
Worklog Time Spent: 10m 
  Work Description: markflyhigh opened a new pull request #4930: 
[BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930
 
 
   Complete Python end-to-end testing framework to support streaming pipelines 
that use PubSub. 
   
   Improvements are:
   - Add `PubSubMessageMatcher` which is a customized hamcrest matcher to pull 
messages from given subscription and verify content.
   - Auto-cancel streaming pipeline by test framework after all matchers are 
verified.
   - Few minor improvements and cleanup in `TestDataflowRunner`
   
   `StreamingWordCountITTest` can verify output messages and cancel pipeline 
after this change. The overall test workflow is:
   
   1. Set up PubSub topics and subscriptions.
   1. Build test options including required pipeline options and test verifiers.
   1. Inject data to input topic.
   1. Call `StreamingWordCount.run` with above options to start the pipeline.
   1. Wait until pipeline start running.
   1. Run verifiers.
   1. Cancel streaming job.
   1. Cleanup PubSub topics and subscriptions.
   
   Note: If other two improvement PRs 
([PR-4921](https://github.com/apache/beam/pull/4921), 
[PR-4922](https://github.com/apache/beam/pull/4922)) are merged before this 
one, rebase is required before merging.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [x] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [x] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83000)
Time Spent: 4h 20m  (was: 4h 10m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82606=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82606
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 21/Mar/18 01:58
Start Date: 21/Mar/18 01:58
Worklog Time Spent: 10m 
  Work Description: aaltay closed pull request #4874: [BEAM-3861] Improve 
test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py 
b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 12f73510873..7ef95d85f1a 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -36,7 +36,7 @@
 
 def split_fn(lines):
   import re
-  return re.findall(r'[A-Za-z\']+', lines)
+  return re.findall(r'[A-Za-z0-9\']+', lines)
 
 
 def run(argv=None):
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py 
b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
new file mode 100644
index 000..a95e5fa8f53
--- /dev/null
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test blocks until the job is manually terminated.
+"""
+
+import logging
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples import streaming_wordcount
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'wc_topic_input'
+OUTPUT_TOPIC = 'wc_topic_output'
+INPUT_SUB = 'wc_subscription_input'
+OUTPUT_SUB = 'wc_subscription_output'
+
+DEFAULT_INPUT_NUMBERS = 500
+
+
+class StreamingWordCountIT(unittest.TestCase):
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+
+# Set up PubSub environment.
+from google.cloud import pubsub
+self.pubsub_client = pubsub.Client(
+project=self.test_pipeline.get_option('project'))
+self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
+self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
+self.input_sub = self.input_topic.subscription(INPUT_SUB)
+self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
+
+self._cleanup_pubsub()
+
+self.input_topic.create()
+self.output_topic.create()
+test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+self.input_sub.create()
+self.output_sub.create()
+
+  def _inject_numbers(self, topic, num_messages):
+"""Inject numbers as test data to PubSub."""
+logging.debug('Injecting %d numbers to topic %s',
+  num_messages, topic.full_name)
+for n in range(num_messages):
+  topic.publish(str(n))
+
+  def _cleanup_pubsub(self):
+test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
+test_utils.cleanup_topics([self.input_topic, self.output_topic])
+
+  def tearDown(self):
+self._cleanup_pubsub()
+
+  @attr('developing_test')
+  def test_streaming_wordcount_it(self):
+# Set extra options to the pipeline for test purpose
+pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)]
+extra_opts = {'input_sub': self.input_sub.full_name,
+  'output_topic': self.output_topic.full_name,
+  'on_success_matcher': all_of(*pipeline_verifiers)}
+
+# Generate input data and inject to 

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82480=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82480
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 21:00
Start Date: 20/Mar/18 21:00
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175920707
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 Review comment:
   can you explain more about what other verification we can do here? Probably 
good to do it here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82480)
Time Spent: 4h  (was: 3h 50m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82477=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82477
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 20:53
Start Date: 20/Mar/18 20:53
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175918622
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
 return self.result
+
+  def _is_in_terminate_state(self, job_state):
 
 Review comment:
   sg, will change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82477)
Time Spent: 3h 50m  (was: 3h 40m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82456=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82456
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 20:10
Start Date: 20/Mar/18 20:10
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175906266
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_utils.py
 ##
 @@ -129,3 +131,47 @@ def delete_files(file_paths):
 raise RuntimeError('Clean up failed. Invalid file path: %s.' %
file_paths)
   FileSystems.delete(file_paths)
+
+
+def wait_for_subscriptions_created(subs, timeout=60):
+  """Wait for all PubSub subscriptions are created."""
+  return _wait_until_all_exist(subs, timeout)
+
+
+def wait_for_topics_created(topics, timeout=60):
+  """Wait for all PubSub topics are created."""
+  return _wait_until_all_exist(topics, timeout)
+
+
+def _wait_until_all_exist(components, timeout):
+  need_wait = set(components)
+  start_time = time.time()
+  while time.time() - start_time <= timeout:
+for c in components:
 
 Review comment:
   OK. Maybe add a TODO to clean this up later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82456)
Time Spent: 3h 40m  (was: 3.5h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82453=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82453
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 20:06
Start Date: 20/Mar/18 20:06
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175905248
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
 return self.result
+
+  def _is_in_terminate_state(self, job_state):
 
 Review comment:
   It is fair to make it public. (We can also use it later to address this 
todo: 
https://github.com/apache/beam/blob/b5041e9ab5c5fbb4435edb6fdcab78bc3483e86c/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L989)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82453)
Time Spent: 3.5h  (was: 3h 20m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82450=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82450
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 20:04
Start Date: 20/Mar/18 20:04
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175904585
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
 return self.result
+
+  def _is_in_terminate_state(self, job_state):
+return job_state in [
+PipelineState.STOPPED, PipelineState.DONE,
+PipelineState.FAILED, PipelineState.CANCELLED,
+PipelineState.UPDATED, PipelineState.DRAINED,
+]
+
+  def wait_until_running(self):
 
 Review comment:
   Sounds good.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82450)
Time Spent: 3h 10m  (was: 3h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82451=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82451
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 20:04
Start Date: 20/Mar/18 20:04
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175904723
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 Review comment:
   I understand this, but for verification I think we need more than that. It 
is fine as one step.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82451)
Time Spent: 3h 20m  (was: 3h 10m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82445
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 19:55
Start Date: 20/Mar/18 19:55
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175902287
 
 

 ##
 File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
 ##
 @@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test is blocked until manually terminate the pipeline job.
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82445)
Time Spent: 2h 40m  (was: 2.5h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82447=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82447
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 19:55
Start Date: 20/Mar/18 19:55
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175902341
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_utils.py
 ##
 @@ -129,3 +131,47 @@ def delete_files(file_paths):
 raise RuntimeError('Clean up failed. Invalid file path: %s.' %
file_paths)
   FileSystems.delete(file_paths)
+
+
+def wait_for_subscriptions_created(subs, timeout=60):
+  """Wait for all PubSub subscriptions are created."""
+  return _wait_until_all_exist(subs, timeout)
+
+
+def wait_for_topics_created(topics, timeout=60):
+  """Wait for all PubSub topics are created."""
+  return _wait_until_all_exist(topics, timeout)
+
+
+def _wait_until_all_exist(components, timeout):
+  need_wait = set(components)
+  start_time = time.time()
+  while time.time() - start_time <= timeout:
+for c in components:
+  if c in need_wait and c.exists():
+need_wait.remove(c)
+if len(need_wait) == 0:
+  return True
+time.sleep(2)
+
+  raise RuntimeError('Timeout after %d seconds. %d of %d topics/subscriptions '
 
 Review comment:
   sure. done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82447)
Time Spent: 3h  (was: 2h 50m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82446=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82446
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 19:55
Start Date: 20/Mar/18 19:55
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175902299
 
 

 ##
 File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
 ##
 @@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test is blocked until manually terminate the pipeline job.
+"""
+
+import logging
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples import streaming_wordcount
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'wc_topic_input'
+OUTPUT_TOPIC = 'wc_topic_output'
+INPUT_SUB = 'wc_subscription_input'
+OUTPUT_SUB = 'wc_subscription_output'
+
+DEFAULT_INPUT_NUMBERS = 500
+
+
+class StreamingWordCountIT(unittest.TestCase):
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+
+# Set up PubSub environment.
+from google.cloud import pubsub
+self.pubsub_client = pubsub.Client(
+project=self.test_pipeline.get_option('project'))
+self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
+self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
+self.input_sub = self.input_topic.subscription(INPUT_SUB)
+self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
+
+self._cleanup_pubsub()
+
+self.input_topic.create()
+self.output_topic.create()
+test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+self.input_sub.create()
+self.output_sub.create()
+
+  def _inject_numbers(self, topic, num_messages):
+"""Inject numbers as test data to PubSub."""
+logging.debug('Injecting %d numbers to topic %s',
+  num_messages, topic.full_name)
+for n in range(num_messages):
+  topic.publish(str(n))
+
+  def _cleanup_pubsub(self):
+test_utils.cleanup_subscription([self.input_sub, self.output_sub])
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82446)
Time Spent: 2h 50m  (was: 2h 40m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82435=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82435
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 19:42
Start Date: 20/Mar/18 19:42
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175896233
 
 

 ##
 File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
 ##
 @@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test is blocked until manually terminate the pipeline job.
+"""
+
+import logging
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples import streaming_wordcount
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'wc_topic_input'
+OUTPUT_TOPIC = 'wc_topic_output'
+INPUT_SUB = 'wc_subscription_input'
+OUTPUT_SUB = 'wc_subscription_output'
+
+DEFAULT_INPUT_NUMBERS = 500
+
+
+class StreamingWordCountIT(unittest.TestCase):
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+
+# Set up PubSub environment.
+from google.cloud import pubsub
+self.pubsub_client = pubsub.Client(
+project=self.test_pipeline.get_option('project'))
+self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
+self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
+self.input_sub = self.input_topic.subscription(INPUT_SUB)
+self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
+
+self._cleanup_pubsub()
+
+self.input_topic.create()
+self.output_topic.create()
+test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+self.input_sub.create()
+self.output_sub.create()
+
+  def _inject_numbers(self, topic, num_messages):
+"""Inject numbers as test data to PubSub."""
+logging.debug('Injecting %d numbers to topic %s',
+  num_messages, topic.full_name)
+for n in range(num_messages):
+  topic.publish(str(n))
+
+  def _cleanup_pubsub(self):
+test_utils.cleanup_subscription([self.input_sub, self.output_sub])
+test_utils.cleanup_topics([self.input_topic, self.output_topic])
+
+  def tearDown(self):
+self._cleanup_pubsub()
+
+  @attr('developing_test')
+  def test_streaming_wordcount_it(self):
+# Set extra options to the pipeline for test purpose
+pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)]
+extra_opts = {'input_sub': self.input_sub.full_name,
+  'output_topic': self.output_topic.full_name,
+  'on_success_matcher': all_of(*pipeline_verifiers)}
+
+# Generate input data and inject to PubSub.
+test_utils.wait_for_subscriptions_created([self.input_sub])
 
 Review comment:
   Actually I want to wait and verify the subscription exists before injecting 
data to PubSub in following line. From the pubsub API, there seems no guarantee 
that subscription is created successfully or instantly after calling 
`subscription.create()`. If subscription is created after data injection 
starts, data may be missing and verification will not be correct. So I add 
check here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id:  

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82431=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82431
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 19:39
Start Date: 20/Mar/18 19:39
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175897788
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_utils.py
 ##
 @@ -129,3 +131,47 @@ def delete_files(file_paths):
 raise RuntimeError('Clean up failed. Invalid file path: %s.' %
file_paths)
   FileSystems.delete(file_paths)
+
+
+def wait_for_subscriptions_created(subs, timeout=60):
+  """Wait for all PubSub subscriptions are created."""
+  return _wait_until_all_exist(subs, timeout)
+
+
+def wait_for_topics_created(topics, timeout=60):
+  """Wait for all PubSub topics are created."""
+  return _wait_until_all_exist(topics, timeout)
+
+
+def _wait_until_all_exist(components, timeout):
+  need_wait = set(components)
+  start_time = time.time()
+  while time.time() - start_time <= timeout:
+for c in components:
 
 Review comment:
   If I replace `components` with `needs_wait` in line 150, I'll get:
   
   RuntimeError: Set changed size during iteration.
   
   guess we cannot modify set during iteration.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82431)
Time Spent: 2h 20m  (was: 2h 10m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82428=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82428
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 19:34
Start Date: 20/Mar/18 19:34
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175896233
 
 

 ##
 File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
 ##
 @@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test is blocked until manually terminate the pipeline job.
+"""
+
+import logging
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples import streaming_wordcount
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'wc_topic_input'
+OUTPUT_TOPIC = 'wc_topic_output'
+INPUT_SUB = 'wc_subscription_input'
+OUTPUT_SUB = 'wc_subscription_output'
+
+DEFAULT_INPUT_NUMBERS = 500
+
+
+class StreamingWordCountIT(unittest.TestCase):
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+
+# Set up PubSub environment.
+from google.cloud import pubsub
+self.pubsub_client = pubsub.Client(
+project=self.test_pipeline.get_option('project'))
+self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
+self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
+self.input_sub = self.input_topic.subscription(INPUT_SUB)
+self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
+
+self._cleanup_pubsub()
+
+self.input_topic.create()
+self.output_topic.create()
+test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+self.input_sub.create()
+self.output_sub.create()
+
+  def _inject_numbers(self, topic, num_messages):
+"""Inject numbers as test data to PubSub."""
+logging.debug('Injecting %d numbers to topic %s',
+  num_messages, topic.full_name)
+for n in range(num_messages):
+  topic.publish(str(n))
+
+  def _cleanup_pubsub(self):
+test_utils.cleanup_subscription([self.input_sub, self.output_sub])
+test_utils.cleanup_topics([self.input_topic, self.output_topic])
+
+  def tearDown(self):
+self._cleanup_pubsub()
+
+  @attr('developing_test')
+  def test_streaming_wordcount_it(self):
+# Set extra options to the pipeline for test purpose
+pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)]
+extra_opts = {'input_sub': self.input_sub.full_name,
+  'output_topic': self.output_topic.full_name,
+  'on_success_matcher': all_of(*pipeline_verifiers)}
+
+# Generate input data and inject to PubSub.
+test_utils.wait_for_subscriptions_created([self.input_sub])
 
 Review comment:
   Actually I want to wait and verify the subscription exists before injecting 
data to PubSub in following line. From the pubsub API, there seems no guarantee 
that subscription is created successfully or instantly after calling 
`subscription.create()`. If subscription is created after data injection 
starts, data may be missing and verification will not be correct.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82428)

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82409=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82409
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 18:53
Start Date: 20/Mar/18 18:53
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175884963
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 Review comment:
   I feel this is one preferred step of streaming test before executing 
verifiers. In integration test class, people can use `PipelineStateMatcher` to 
get a clear signal that the pipeline entered in RUNNING state (like 
[here](https://github.com/apache/beam/pull/4874/files#diff-a7940d9c8528322f7e20488c31ee47d3R85)).
 So they are confident to define/add following actions (like customized 
verifiers). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82409)
Time Spent: 2h  (was: 1h 50m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82402
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 18:43
Start Date: 20/Mar/18 18:43
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175881878
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
 return self.result
+
+  def _is_in_terminate_state(self, job_state):
+return job_state in [
+PipelineState.STOPPED, PipelineState.DONE,
+PipelineState.FAILED, PipelineState.CANCELLED,
+PipelineState.UPDATED, PipelineState.DRAINED,
+]
+
+  def wait_until_running(self):
 
 Review comment:
   That's exactly what I did in the following 
[PR](https://github.com/markflyhigh/incubator-beam/pull/2/files#diff-e48ff65b992d87457791341246d796f9R83).
 How about we generalize it in the future?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82402)
Time Spent: 1h 50m  (was: 1h 40m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82397=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82397
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 20/Mar/18 18:36
Start Date: 20/Mar/18 18:36
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175879745
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
 return self.result
+
+  def _is_in_terminate_state(self, job_state):
 
 Review comment:
   sure. The reason I didn't use it is that `_is_in_terminal_state` is a 
private function. Should we make it public (rename to `is_in_terminal_state`)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82397)
Time Spent: 1h 40m  (was: 1.5h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82091=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82091
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 19/Mar/18 23:27
Start Date: 19/Mar/18 23:27
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175616259
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_utils.py
 ##
 @@ -129,3 +131,47 @@ def delete_files(file_paths):
 raise RuntimeError('Clean up failed. Invalid file path: %s.' %
file_paths)
   FileSystems.delete(file_paths)
+
+
+def wait_for_subscriptions_created(subs, timeout=60):
+  """Wait for all PubSub subscriptions are created."""
+  return _wait_until_all_exist(subs, timeout)
+
+
+def wait_for_topics_created(topics, timeout=60):
+  """Wait for all PubSub topics are created."""
+  return _wait_until_all_exist(topics, timeout)
+
+
+def _wait_until_all_exist(components, timeout):
+  need_wait = set(components)
+  start_time = time.time()
+  while time.time() - start_time <= timeout:
+for c in components:
 
 Review comment:
   I think you can do `for c in needs_wait` instead to iterate over a shorter 
list and remove `c in need_wait` below.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82091)
Time Spent: 1h 20m  (was: 1h 10m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82085=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82085
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 19/Mar/18 23:27
Start Date: 19/Mar/18 23:27
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175615483
 
 

 ##
 File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
 ##
 @@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test is blocked until manually terminate the pipeline job.
 
 Review comment:
   How about:
   
   Currently, this test blocks until the job is manually terminated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82085)
Time Spent: 0.5h  (was: 20m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82088=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82088
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 19/Mar/18 23:27
Start Date: 19/Mar/18 23:27
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175616557
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_utils.py
 ##
 @@ -129,3 +131,47 @@ def delete_files(file_paths):
 raise RuntimeError('Clean up failed. Invalid file path: %s.' %
file_paths)
   FileSystems.delete(file_paths)
+
+
+def wait_for_subscriptions_created(subs, timeout=60):
+  """Wait for all PubSub subscriptions are created."""
+  return _wait_until_all_exist(subs, timeout)
+
+
+def wait_for_topics_created(topics, timeout=60):
+  """Wait for all PubSub topics are created."""
+  return _wait_until_all_exist(topics, timeout)
+
+
+def _wait_until_all_exist(components, timeout):
+  need_wait = set(components)
+  start_time = time.time()
+  while time.time() - start_time <= timeout:
+for c in components:
+  if c in need_wait and c.exists():
+need_wait.remove(c)
+if len(need_wait) == 0:
+  return True
+time.sleep(2)
+
+  raise RuntimeError('Timeout after %d seconds. %d of %d topics/subscriptions '
 
 Review comment:
   Would it be helpful if you log what does not exist?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82088)
Time Spent: 50m  (was: 40m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82087=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82087
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 19/Mar/18 23:27
Start Date: 19/Mar/18 23:27
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175590831
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
 return self.result
+
+  def _is_in_terminate_state(self, job_state):
 
 Review comment:
   Name this `_is_in_terminal_state`.
   
   Can you use `_is_in_terminal_state` from `DataflowPipelineResult`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82087)
Time Spent: 40m  (was: 0.5h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82089=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82089
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 19/Mar/18 23:27
Start Date: 19/Mar/18 23:27
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175615705
 
 

 ##
 File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
 ##
 @@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test is blocked until manually terminate the pipeline job.
+"""
+
+import logging
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples import streaming_wordcount
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'wc_topic_input'
+OUTPUT_TOPIC = 'wc_topic_output'
+INPUT_SUB = 'wc_subscription_input'
+OUTPUT_SUB = 'wc_subscription_output'
+
+DEFAULT_INPUT_NUMBERS = 500
+
+
+class StreamingWordCountIT(unittest.TestCase):
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+
+# Set up PubSub environment.
+from google.cloud import pubsub
+self.pubsub_client = pubsub.Client(
+project=self.test_pipeline.get_option('project'))
+self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
+self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
+self.input_sub = self.input_topic.subscription(INPUT_SUB)
+self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
+
+self._cleanup_pubsub()
+
+self.input_topic.create()
+self.output_topic.create()
+test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+self.input_sub.create()
+self.output_sub.create()
+
+  def _inject_numbers(self, topic, num_messages):
+"""Inject numbers as test data to PubSub."""
+logging.debug('Injecting %d numbers to topic %s',
+  num_messages, topic.full_name)
+for n in range(num_messages):
+  topic.publish(str(n))
+
+  def _cleanup_pubsub(self):
+test_utils.cleanup_subscription([self.input_sub, self.output_sub])
 
 Review comment:
   Rename to `cleanup_subscriptions`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82089)
Time Spent: 1h  (was: 50m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82090=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82090
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 19/Mar/18 23:27
Start Date: 19/Mar/18 23:27
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175591205
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 if on_success_matcher:
   from hamcrest import assert_that as hc_assert_that
   hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
 return self.result
+
+  def _is_in_terminate_state(self, job_state):
+return job_state in [
+PipelineState.STOPPED, PipelineState.DONE,
+PipelineState.FAILED, PipelineState.CANCELLED,
+PipelineState.UPDATED, PipelineState.DRAINED,
+]
+
+  def wait_until_running(self):
 
 Review comment:
   Maybe you can generalize this as wait until state with timeout. (Pass 
desired state and timeout as arguments.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82090)
Time Spent: 1h 10m  (was: 1h)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82092=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82092
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 19/Mar/18 23:27
Start Date: 19/Mar/18 23:27
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175615878
 
 

 ##
 File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
 ##
 @@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test is blocked until manually terminate the pipeline job.
+"""
+
+import logging
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples import streaming_wordcount
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'wc_topic_input'
+OUTPUT_TOPIC = 'wc_topic_output'
+INPUT_SUB = 'wc_subscription_input'
+OUTPUT_SUB = 'wc_subscription_output'
+
+DEFAULT_INPUT_NUMBERS = 500
+
+
+class StreamingWordCountIT(unittest.TestCase):
+
+  def setUp(self):
+self.test_pipeline = TestPipeline(is_integration_test=True)
+
+# Set up PubSub environment.
+from google.cloud import pubsub
+self.pubsub_client = pubsub.Client(
+project=self.test_pipeline.get_option('project'))
+self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
+self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
+self.input_sub = self.input_topic.subscription(INPUT_SUB)
+self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
+
+self._cleanup_pubsub()
+
+self.input_topic.create()
+self.output_topic.create()
+test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+self.input_sub.create()
+self.output_sub.create()
+
+  def _inject_numbers(self, topic, num_messages):
+"""Inject numbers as test data to PubSub."""
+logging.debug('Injecting %d numbers to topic %s',
+  num_messages, topic.full_name)
+for n in range(num_messages):
+  topic.publish(str(n))
+
+  def _cleanup_pubsub(self):
+test_utils.cleanup_subscription([self.input_sub, self.output_sub])
+test_utils.cleanup_topics([self.input_topic, self.output_topic])
+
+  def tearDown(self):
+self._cleanup_pubsub()
+
+  @attr('developing_test')
+  def test_streaming_wordcount_it(self):
+# Set extra options to the pipeline for test purpose
+pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)]
+extra_opts = {'input_sub': self.input_sub.full_name,
+  'output_topic': self.output_topic.full_name,
+  'on_success_matcher': all_of(*pipeline_verifiers)}
+
+# Generate input data and inject to PubSub.
+test_utils.wait_for_subscriptions_created([self.input_sub])
 
 Review comment:
   You already created subscriptions in `setUp`. Do you need to check it here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82092)
Time Spent: 1.5h  (was: 1h 20m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  

[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82086=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82086
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 19/Mar/18 23:27
Start Date: 19/Mar/18 23:27
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4874: 
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#discussion_r175591382
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##
 @@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
   print (
   'Found: https://console.cloud.google.com/dataflow/jobsDetail'
   '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-self.result.wait_until_finish()
+
+if not options.view_as(StandardOptions).streaming:
+  self.result.wait_until_finish()
+else:
+  # TODO: Ideally, we want to wait until workers start successfully.
+  self.wait_until_running()
 
 Review comment:
   What is the point of waiting until running? (i.e. What are we verifying by 
doing this?) Is this going to be addressed in the follow up PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82086)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=81389=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81389
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 16/Mar/18 21:45
Start Date: 16/Mar/18 21:45
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4874: [BEAM-3861] 
Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#issuecomment-373853545
 
 
   Rebased against master. It's ready to review.
   Please take a look @aaltay 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81389)
Time Spent: 20m  (was: 10m)

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3861) Build test infra for end-to-end streaming test in Python SDK

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=81232=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81232
 ]

ASF GitHub Bot logged work on BEAM-3861:


Author: ASF GitHub Bot
Created on: 16/Mar/18 17:15
Start Date: 16/Mar/18 17:15
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #4874: [BEAM-3861] 
Improve test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874#issuecomment-373783156
 
 
   +cc: @aaltay 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81232)
Time Spent: 10m
Remaining Estimate: 0h

> Build test infra for end-to-end streaming test in Python SDK
> 
>
> Key: BEAM-3861
> URL: https://issues.apache.org/jira/browse/BEAM-3861
> Project: Beam
>  Issue Type: Task
>  Components: testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)