Thanks for your support. Bug filed for this is https://issues.apache.org/jira/browse/BEAM-3271 <https://issues.apache.org/jira/browse/BEAM-3271>
Best, Carsten > On 28. Nov 2017, at 00:11, Eugene Kirpichov <[email protected]> wrote: > > Upon talking to the BigQuery team, it became clear that this is indeed a bug > in BigQueryIO. This error is not reported via InsertErrors because the > InsertAll request specifies the table once rather than per row, and the table > is invalid, so all rows in the batch are invalid. Beam should handle this. > > On Mon, Nov 27, 2017 at 10:48 AM Eugene Kirpichov <[email protected] > <mailto:[email protected]>> wrote: > Hmm, the code doesn't even get to the retry policy because it throws a > GoogleJsonResponseException rather than returning a > TableDataInsertAllResponse with InsertErrors in it. I think BigQuery client > library didn't use to throw GoogleJsonResponseException in this case > before... I'm not sure if this is a bug in BigQueryIO, or a bug in BigQuery, > or a bug in the BigQuery client library, or a bug in something else - but > overall it is definitely a bug. Would you mind filing a JIRA to track this at > https://issues.apache.org/jira/browse/BEAM > <https://issues.apache.org/jira/browse/BEAM> ? > > On Thu, Nov 23, 2017 at 4:00 AM Carsten Krebs | GameDuell > <[email protected] <mailto:[email protected]>> wrote: > Hi, > > I currently try to stream data into BigQuery table using Beam. As retry > policy for failed tuples I’m using InsertRetryPolicy.retryTransientErrors() > (s.b.). When looking on the code my expectation was, that that rows written, > which caused an error with the reason “invalid” will not be retried. However > what I’m observing is, that these rows are retried over and over again. > > Are my assumptions wrong? I’m doing something wrong, or is this a bug? > > Any help? > > Thanks, > > Carsten > > > Code Snippet: > p.apply(BigQueryIO.writeTableRows() > .to(new DatePartitionedTableSpecifier(tableReference, > "tracking data")) > .withSchema(schema) > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) > > .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) > // write all failed inserts to a DMQ > .getFailedInserts().apply(MapElements.via(new > SimpleFunction<TableRow, PubsubMessage>() { > public PubsubMessage apply(final TableRow _row) { > try { > return new > PubsubMessage(JacksonFactory.getDefaultInstance().toByteArray(_row), > Collections.<String, String>emptyMap()); > } catch (IOException e) { > throw new RuntimeException("failed to write to DMQ", e); > } > } > })).apply(PubsubIO.writeMessages().to("projects/gameduell-bits-bigquery-poc/topics/dmq")); > > Error: > (1a04bdb0d43aca9c): java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.RuntimeException: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:774) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: > 400 Bad Request > { > "code" : 400, > "errors" : [ { > "domain" : "global", > "message" : "The destination table's partition rum$20170925 is outside > the allowed bounds. You can only stream to partitions within 31 days in the > past and 16 days in the future relative to the current date.", > "reason" : "invalid" > } ], > "message" : "The destination table's partition rum$20170925 is outside the > allowed bounds. You can only stream to partitions within 31 days in the past > and 16 days in the future relative to the current date.", > "status" : "INVALID_ARGUMENT" > } > > com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) > > com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:720) > > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.call(BigQueryServicesImpl.java:712) > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > >
signature.asc
Description: Message signed with OpenPGP
