[
https://issues.apache.org/jira/browse/BEAM-5706?focusedWorklogId=153961&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153961
]
ASF GitHub Bot logged work on BEAM-5706:
Author: ASF GitHub Bot
Created on: 12/Oct/18 17:50
Start Date: 12/Oct/18 17:50
Worklog Time Spent: 10m
Work Description: charlesccychen closed pull request #6674: [BEAM-5706]
Revert 324f0b3e3c (pull request #6564 from udim/pubsub-0-35-4)
URL: https://github.com/apache/beam/pull/6674
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
index 2fc19daa1af..6dc60d0a807 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
@@ -72,15 +72,15 @@ def setUp(self):
# Set up PubSub environment.
from google.cloud import pubsub
-self.pub_client = pubsub.PublisherClient()
-self.input_topic = self.pub_client.create_topic(
-self.pub_client.topic_path(self.project, self.INPUT_TOPIC +
_unique_id))
+self.pubsub_client = pubsub.Client(project=self.project)
+unique_topic_name = self.INPUT_TOPIC + _unique_id
+unique_subscrition_name = self.INPUT_SUB + _unique_id
+self.input_topic = self.pubsub_client.topic(unique_topic_name)
+self.input_sub = self.input_topic.subscription(unique_subscrition_name)
-self.sub_client = pubsub.SubscriberClient()
-self.input_sub = self.sub_client.create_subscription(
-self.sub_client.subscription_path(self.project,
- self.INPUT_SUB + _unique_id),
-self.input_topic.name)
+self.input_topic.create()
+test_utils.wait_for_topics_created([self.input_topic])
+self.input_sub.create()
# Set up BigQuery environment
from google.cloud import bigquery
@@ -95,15 +95,14 @@ def _inject_pubsub_game_events(self, topic, message_count):
"""Inject game events as test data to PubSub."""
logging.debug('Injecting %d game events to topic %s',
- message_count, topic.name)
+ message_count, topic.full_name)
for _ in range(message_count):
- self.pub_client.publish(topic.name,
- self.INPUT_EVENT % self._test_timestamp)
+ topic.publish(self.INPUT_EVENT % self._test_timestamp)
def _cleanup_pubsub(self):
-test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
-test_utils.cleanup_topics(self.pub_client, [self.input_topic])
+test_utils.cleanup_subscriptions([self.input_sub])
+test_utils.cleanup_topics([self.input_topic])
def _cleanup_dataset(self):
self.dataset.delete()
@@ -124,9 +123,9 @@ def test_game_stats_it(self):
# TODO(mariagh): Add teams table verifier once game_stats.py is fixed.
-extra_opts = {'subscription': self.input_sub.name,
+extra_opts = {'subscription': self.input_sub.full_name,
'dataset': self.dataset.name,
- 'topic': self.input_topic.name,
+ 'topic': self.input_topic.full_name,
'fixed_window_duration': 1,
'user_activity_window_duration': 1,
'wait_until_finish_duration':
@@ -144,6 +143,8 @@ def test_game_stats_it(self):
self.dataset.name, self.OUTPUT_TABLE_TEAMS)
# Generate input data and inject to PubSub.
+test_utils.wait_for_subscriptions_created([self.input_topic,
+ self.input_sub])
self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
# Get pipeline options from command argument: --test-pipeline-options,
diff --git
a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
index e0e309b1265..ab109425eb6 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
@@ -73,16 +73,15 @@ def setUp(self):
# Set up PubSub environment.
from google.cloud import pubsub
+self.pubsub_client = pubsub.Client(project=self.project)
+unique_topic_name = self.INPUT_TOPIC + _unique_id
+unique_subscrition_name = self.INPUT_SUB + _unique_id
+self.input_topic = self.pubsub_client.topic(unique_topic_name)
+self.input_sub = self.input_topic.subscription(unique_subscrition_name)
-self.pub_client = pubsub.PublisherClient()
-self.input_