This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d61a1fa  [BEAM-11517] Enable test_file_loads on Dataflow
     new e9a7b44  Merge pull request #13601 from [BEAM-11517] Fix 
test_file_loads on Dataflow
d61a1fa is described below

commit d61a1fa2d7feed8a9cf1473da7090e461815eac5
Author: Udi Meiri <eh...@google.com>
AuthorDate: Tue Dec 22 17:23:09 2020 -0800

    [BEAM-11517] Enable test_file_loads on Dataflow
---
 .../apache_beam/io/gcp/bigquery_file_loads.py       | 21 +++++++++++++--------
 sdks/python/apache_beam/io/gcp/bigquery_test.py     |  2 --
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 0342b4c..ae5ebcc 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -794,7 +794,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
       partitions_direct_to_destination,
       load_job_name_pcv,
       copy_job_name_pcv,
-      singleton_pc,
+      p,
       step_name):
     """Load data to BigQuery
 
@@ -832,7 +832,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
     temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES]
 
     destination_copy_job_ids_pc = (
-        singleton_pc
+        p
+        | "ImpulseMonitorLoadJobs" >> beam.Create([None])
         | "WaitForTempTableLoadJobs" >> beam.ParDo(
             WaitForBQJobs(self.test_client),
             beam.pvalue.AsList(temp_tables_load_job_ids_pc))
@@ -845,7 +846,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
             copy_job_name_pcv))
 
     finished_copy_jobs_pc = (
-        singleton_pc
+        p
+        | "ImpulseMonitorCopyJobs" >> beam.Create([None])
         | "WaitForCopyJobs" >> beam.ParDo(
             WaitForBQJobs(self.test_client),
             beam.pvalue.AsList(destination_copy_job_ids_pc)))
@@ -879,7 +881,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
             *self.schema_side_inputs))
 
     _ = (
-        singleton_pc
+        p
+        | "ImpulseMonitorDestLoadJobs" >> beam.Create([None])
         | "WaitForDestinationLoadJobs" >> beam.ParDo(
             WaitForBQJobs(self.test_client),
             beam.pvalue.AsList(destination_load_job_ids_pc)))
@@ -964,14 +967,16 @@ class BigQueryBatchFileLoads(beam.PTransform):
                           empty_pc,
                           load_job_name_pcv,
                           copy_job_name_pcv,
-                          singleton_pc,
+                          p,
                           step_name))
     else:
       destination_load_job_ids_pc, destination_copy_job_ids_pc = (
           self._load_data(multiple_partitions_per_destination_pc,
-                         single_partition_per_destination_pc,
-                         load_job_name_pcv, copy_job_name_pcv, singleton_pc,
-                         step_name))
+                          single_partition_per_destination_pc,
+                          load_job_name_pcv,
+                          copy_job_name_pcv,
+                          p,
+                          step_name))
 
     return {
         self.DESTINATION_JOBID_PAIRS: destination_load_job_ids_pc,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 86614b4..fefe461 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -1245,8 +1245,6 @@ class PubSubBigQueryIT(unittest.TestCase):
 
   @attr('IT')
   def test_file_loads(self):
-    if isinstance(self.test_pipeline.runner, TestDataflowRunner):
-      self.skipTest('https://issuetracker.google.com/issues/118375066')
     self._run_pubsub_bq_pipeline(
         WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20)
 

Reply via email to