Also, I would like to point out that your suggestion of wrapping existing I/O transforms with custom transforms will not work unless the underlying transform already emits failed writes to a separate PCollection. Custom transforms are expanded during job submission and errors raised by underlying transforms are handled by runners (not by the wrapping transform).
Thanks, Cham On Mon, Nov 20, 2017 at 10:20 AM Eugene Kirpichov <[email protected]> wrote: > Note that this works only for streaming inserts, because with failed batch > loads it is not possible to isolate which individual writes failed. > > On Mon, Nov 20, 2017 at 10:01 AM Lukasz Cwik <[email protected]> wrote: > >> BigQueryIO has been written in such a way to support emitting failed >> records to a "dead letter queue". Not all IO transforms support this but it >> is very useful for the ones that do. >> >> WriteResult writeResult = p.apply(PubsubIO.readMessagesWithAttributes() >> .fromSubscription(“<some subscription>")) >> .apply(MapElements.via(new MapMessageToBigQueryRow())) >> .apply(BigQueryIO.writeTableRows().to(tableReference) >> .withSchema(schema) >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >> >> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >> >> .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) >> ); >> PCollection<TableRow> failedWrites = writeResult.getFailedWrites(); >> >> On Mon, Nov 20, 2017 at 7:07 AM, Carsten Krebs | GameDuell < >> [email protected]> wrote: >> >>> Hi, >>> >>> I’m new to Apache Beam and currently try to figure out the right way to >>> deal with errors in an I/O transformation. Generally, I would like to >>> filter out tuples which could not have been written for whatever reason and >>> write them to some sort of “dead letter queue”. >>> >>> What is the right way to tag tuples, which have led to an Exception >>> while writing? >>> >>> Currently my very simple pipeline looks like: >>> >>> p.apply(PubsubIO.readMessagesWithAttributes() >>> .fromSubscription(*“*<some subscription>")) >>> .apply(MapElements.via(new MapMessageToBigQueryRow())) >>> .apply(BigQueryIO.writeTableRows().to(tableReference) >>> .withSchema(schema) >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>> >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>> >>> .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) >>> ); >>> >>> >>> Do I have to wrap BigQueryIO.Write instance in some kind of custom >>> transformation, catching Exceptions and tagging the input values somehow? >>> Are there better, more appropriate ways to do this? >>> >>> Thanks in advance, >>> >>> Carsten >>> >>> >>> >>> >>> >>
