Hmm, the code doesn't even get to the retry policy because it throws a GoogleJsonResponseException rather than returning a TableDataInsertAllResponse with InsertErrors in it. I think BigQuery client library didn't use to throw GoogleJsonResponseException in this case before... I'm not sure if this is a bug in BigQueryIO, or a bug in BigQuery, or a bug in the BigQuery client library, or a bug in something else - but overall it is definitely a bug. Would you mind filing a JIRA to track this at https://issues.apache.org/jira/browse/BEAM ?
On Thu, Nov 23, 2017 at 4:00 AM Carsten Krebs | GameDuell < [email protected]> wrote: > Hi, > > I currently try to stream data into BigQuery table using Beam. As retry > policy for failed tuples I’m using InsertRetryPolicy.retryTransientErrors > () (s.b.). When looking on the code my expectation was, that that rows > written, which caused an error with the reason “invalid” will not be > retried. However what I’m observing is, that these rows are retried over > and over again. > > Are my assumptions wrong? I’m doing something wrong, or is this a bug? > > Any help? > > Thanks, > > Carsten > > > Code Snippet: > > p.apply(BigQueryIO.writeTableRows() > .to(new DatePartitionedTableSpecifier(tableReference, > "tracking data")) > .withSchema(schema) > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) > > .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) > > // write all failed inserts to a DMQ > .getFailedInserts().apply(MapElements.via(new > SimpleFunction<TableRow, PubsubMessage>() { > public PubsubMessage apply(final TableRow _row) { > try { > return new > PubsubMessage(JacksonFactory.getDefaultInstance().toByteArray(_row), > Collections.<String, String>emptyMap()); > } catch (IOException e) { > throw new RuntimeException("failed to write to DMQ", e); > } > } > })).apply(PubsubIO.writeMessages().to("projects/gameduell-bits-bigquery-poc/topics/dmq")); > > > Error: > > (1a04bdb0d43aca9c): java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > > > >
