Note that this works only for streaming inserts, because with failed batch loads it is not possible to isolate which individual writes failed.
On Mon, Nov 20, 2017 at 10:01 AM Lukasz Cwik <[email protected]> wrote: > BigQueryIO has been written in such a way to support emitting failed > records to a "dead letter queue". Not all IO transforms support this but it > is very useful for the ones that do. > > WriteResult writeResult = p.apply(PubsubIO.readMessagesWithAttributes() > .fromSubscription(“<some subscription>")) > .apply(MapElements.via(new MapMessageToBigQueryRow())) > .apply(BigQueryIO.writeTableRows().to(tableReference) > .withSchema(schema) > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) > > .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) > ); > PCollection<TableRow> failedWrites = writeResult.getFailedWrites(); > > On Mon, Nov 20, 2017 at 7:07 AM, Carsten Krebs | GameDuell < > [email protected]> wrote: > >> Hi, >> >> I’m new to Apache Beam and currently try to figure out the right way to >> deal with errors in an I/O transformation. Generally, I would like to >> filter out tuples which could not have been written for whatever reason and >> write them to some sort of “dead letter queue”. >> >> What is the right way to tag tuples, which have led to an Exception while >> writing? >> >> Currently my very simple pipeline looks like: >> >> p.apply(PubsubIO.readMessagesWithAttributes() >> .fromSubscription(*“*<some subscription>")) >> .apply(MapElements.via(new MapMessageToBigQueryRow())) >> .apply(BigQueryIO.writeTableRows().to(tableReference) >> .withSchema(schema) >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >> >> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >> >> .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) >> ); >> >> >> Do I have to wrap BigQueryIO.Write instance in some kind of custom >> transformation, catching Exceptions and tagging the input values somehow? >> Are there better, more appropriate ways to do this? >> >> Thanks in advance, >> >> Carsten >> >> >> >> >> >
