This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d61a1fa [BEAM-11517] Enable test_file_loads on Dataflow new e9a7b44 Merge pull request #13601 from [BEAM-11517] Fix test_file_loads on Dataflow d61a1fa is described below commit d61a1fa2d7feed8a9cf1473da7090e461815eac5 Author: Udi Meiri <eh...@google.com> AuthorDate: Tue Dec 22 17:23:09 2020 -0800 [BEAM-11517] Enable test_file_loads on Dataflow --- .../apache_beam/io/gcp/bigquery_file_loads.py | 21 +++++++++++++-------- sdks/python/apache_beam/io/gcp/bigquery_test.py | 2 -- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 0342b4c..ae5ebcc 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -794,7 +794,7 @@ class BigQueryBatchFileLoads(beam.PTransform): partitions_direct_to_destination, load_job_name_pcv, copy_job_name_pcv, - singleton_pc, + p, step_name): """Load data to BigQuery @@ -832,7 +832,8 @@ class BigQueryBatchFileLoads(beam.PTransform): temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES] destination_copy_job_ids_pc = ( - singleton_pc + p + | "ImpulseMonitorLoadJobs" >> beam.Create([None]) | "WaitForTempTableLoadJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), beam.pvalue.AsList(temp_tables_load_job_ids_pc)) @@ -845,7 +846,8 @@ class BigQueryBatchFileLoads(beam.PTransform): copy_job_name_pcv)) finished_copy_jobs_pc = ( - singleton_pc + p + | "ImpulseMonitorCopyJobs" >> beam.Create([None]) | "WaitForCopyJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), beam.pvalue.AsList(destination_copy_job_ids_pc))) @@ -879,7 +881,8 @@ class BigQueryBatchFileLoads(beam.PTransform): *self.schema_side_inputs)) _ = ( - singleton_pc + p + | "ImpulseMonitorDestLoadJobs" >> beam.Create([None]) | "WaitForDestinationLoadJobs" >> beam.ParDo( WaitForBQJobs(self.test_client), beam.pvalue.AsList(destination_load_job_ids_pc))) @@ -964,14 +967,16 @@ class BigQueryBatchFileLoads(beam.PTransform): empty_pc, load_job_name_pcv, copy_job_name_pcv, - singleton_pc, + p, step_name)) else: destination_load_job_ids_pc, destination_copy_job_ids_pc = ( self._load_data(multiple_partitions_per_destination_pc, - single_partition_per_destination_pc, - load_job_name_pcv, copy_job_name_pcv, singleton_pc, - step_name)) + single_partition_per_destination_pc, + load_job_name_pcv, + copy_job_name_pcv, + p, + step_name)) return { self.DESTINATION_JOBID_PAIRS: destination_load_job_ids_pc, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 86614b4..fefe461 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -1245,8 +1245,6 @@ class PubSubBigQueryIT(unittest.TestCase): @attr('IT') def test_file_loads(self): - if isinstance(self.test_pipeline.runner, TestDataflowRunner): - self.skipTest('https://issuetracker.google.com/issues/118375066') self._run_pubsub_bq_pipeline( WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20)