[ 
https://issues.apache.org/jira/browse/BEAM-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pablo Estrada resolved BEAM-7856.
---------------------------------
    Fix Version/s: Not applicable
       Resolution: Fixed

> BigQuery table creation race condition error when executing pipeline on 
> multiple workers
> ----------------------------------------------------------------------------------------
>
>                 Key: BEAM-7856
>                 URL: https://issues.apache.org/jira/browse/BEAM-7856
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>            Priority: Major
>             Fix For: Not applicable
>
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> This is non-fatal issue and just prints error in the logs as far as I can 
> tell.
> The issue is when we check and create big query table on multiple workers at 
> the same time. This causes the race condition.
>  
> {noformat}
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 157, in _execute response = task() File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 190, in <lambda> self._execute(lambda: worker.do_instruction(work), 
> work) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 342, in do_instruction request.instruction_id) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 368, in process_bundle bundle_processor.process_bundle(instruction_id)) 
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 593, in process_bundle data.ptransform_id].process_encoded(data.data) 
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 143, in process_encoded self.output(decoded_value) File 
> "apache_beam/runners/worker/operations.py", line 255, in 
> apache_beam.runners.worker.operations.Operation.output def output(self, 
> windowed_value, output_index=0): File 
> "apache_beam/runners/worker/operations.py", line 256, in 
> apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
> self.consumer.process(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 593, in 
> apache_beam.runners.worker.operations.DoOperation.process with 
> self.scoped_process_state: File "apache_beam/runners/worker/operations.py", 
> line 594, in apache_beam.runners.worker.operations.DoOperation.process 
> delayed_application = self.dofn_receiver.receive(o) File 
> "apache_beam/runners/common.py", line 799, in 
> apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) 
> File "apache_beam/runners/common.py", line 805, in 
> apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) 
> File "apache_beam/runners/common.py", line 857, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented raise File 
> "apache_beam/runners/common.py", line 803, in 
> apache_beam.runners.common.DoFnRunner.process return 
> self.do_fn_invoker.invoke_process(windowed_value) File 
> "apache_beam/runners/common.py", line 610, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> self._invoke_process_per_window( File "apache_beam/runners/common.py", line 
> 682, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
> output_processor.process_outputs( File "apache_beam/runners/common.py", line 
> 903, in apache_beam.runners.common._OutputProcessor.process_outputs def 
> process_outputs(self, windowed_input_element, results): File 
> "apache_beam/runners/common.py", line 942, in 
> apache_beam.runners.common._OutputProcessor.process_outputs 
> self.main_receivers.receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
> self.consumer.process(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 593, in 
> apache_beam.runners.worker.operations.DoOperation.process with 
> self.scoped_process_state: File "apache_beam/runners/worker/operations.py", 
> line 594, in apache_beam.runners.worker.operations.DoOperation.process 
> delayed_application = self.dofn_receiver.receive(o) File 
> "apache_beam/runners/common.py", line 799, in 
> apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) 
> File "apache_beam/runners/common.py", line 805, in 
> apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) 
> File "apache_beam/runners/common.py", line 857, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented raise File 
> "apache_beam/runners/common.py", line 803, in 
> apache_beam.runners.common.DoFnRunner.process return 
> self.do_fn_invoker.invoke_process(windowed_value) File 
> "apache_beam/runners/common.py", line 465, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process 
> output_processor.process_outputs( File "apache_beam/runners/common.py", line 
> 942, in apache_beam.runners.common._OutputProcessor.process_outputs 
> self.main_receivers.receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
> self.consumer.process(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 593, in 
> apache_beam.runners.worker.operations.DoOperation.process with 
> self.scoped_process_state: File "apache_beam/runners/worker/operations.py", 
> line 594, in apache_beam.runners.worker.operations.DoOperation.process 
> delayed_application = self.dofn_receiver.receive(o) File 
> "apache_beam/runners/common.py", line 799, in 
> apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) 
> File "apache_beam/runners/common.py", line 805, in 
> apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) 
> File "apache_beam/runners/common.py", line 857, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented raise File 
> "apache_beam/runners/common.py", line 803, in 
> apache_beam.runners.common.DoFnRunner.process return 
> self.do_fn_invoker.invoke_process(windowed_value) File 
> "apache_beam/runners/common.py", line 465, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process 
> output_processor.process_outputs( File "apache_beam/runners/common.py", line 
> 942, in apache_beam.runners.common._OutputProcessor.process_outputs 
> self.main_receivers.receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
> self.consumer.process(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 593, in 
> apache_beam.runners.worker.operations.DoOperation.process with 
> self.scoped_process_state: File "apache_beam/runners/worker/operations.py", 
> line 594, in apache_beam.runners.worker.operations.DoOperation.process 
> delayed_application = self.dofn_receiver.receive(o) File 
> "apache_beam/runners/common.py", line 799, in 
> apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) 
> File "apache_beam/runners/common.py", line 805, in 
> apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) 
> File "apache_beam/runners/common.py", line 872, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> raise_with_traceback(new_exn) File "apache_beam/runners/common.py", line 803, 
> in apache_beam.runners.common.DoFnRunner.process return 
> self.do_fn_invoker.invoke_process(windowed_value) File 
> "apache_beam/runners/common.py", line 466, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process windowed_value, 
> self.process_method(windowed_value.value)) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 
> 819, in process schema) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 
> 804, in _create_table_if_needed 
> additional_create_parameters=self.additional_bq_parameters) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 
> 197, in wrapper return fun(*args, **kwargs) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery_tools.py",
>  line 667, in get_or_create_table 
> additional_parameters=additional_create_parameters) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery_tools.py",
>  line 444, in _create_table response = self.client.tables.Insert(request) 
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py",
>  line 606, in Insert config, request, global_params=global_params) File 
> "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 
> 731, in _RunMethod return self.ProcessHttpResponse(method_config, 
> http_response, request) File 
> "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 
> 737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config, 
> http_response, request)) File 
> "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 
> 604, in __ProcessHttpResponse http_response, method_config=method_config, 
> request=request) RuntimeError: HttpConflictError: HttpError accessing 
> <https://www.googleapis.com/bigquery/v2/projects/google.com%3Aclouddfe/datasets/integration_test_data/tables?alt=json>:
>  response: <{'status': '409', 'content-length': '440', 'x-xss-protection': 
> '0', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 
> 'vary': 'Origin, X-Origin, Referer', 'server': 'ESF', '-content-encoding': 
> 'gzip', 'cache-control': 'private', 'date': 'Wed, 31 Jul 2019 03:15:15 GMT', 
> 'x-frame-options': 'SAMEORIGIN', 'content-type': 'application/json; 
> charset=UTF-8'}>, content <{ "error": { "code": 409, "message": "Already 
> Exists: Table 
> google.com:clouddfe:integration_test_data.dataflow_status_by_environment_python_07302008337162",
>  "errors": [ { "message": "Already Exists: Table 
> google.com:clouddfe:integration_test_data.dataflow_status_by_environment_python_07302008337162",
>  "domain": "global", "reason": "duplicate" } ], "status": "ALREADY_EXISTS" } 
> } > [while running 'generatedPtransform-1091'] "  portability_worker_id: 
> "sdk0"  thread: "ThreadPoolExecutor-1_9"  worker: 
> "df2-long-running-streamin-07302010-wy16-harness-wn53"  } labels: {…}  
> logName: "projects/google.com:clouddfe/logs/dataflow.googleapis.com%2Fworker" 
>  receiveTimestamp: "2019-07-31T03:15:16.031402142Z"  resource: {…}  severity: 
> "ERROR"  timestamp: "2019-07-31T03:15:15.369057893Z"  }
> {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to