BigQueryIO has been written in such a way to support emitting failed
records to a "dead letter queue". Not all IO transforms support this but it
is very useful for the ones that do.

WriteResult writeResult = p.apply(PubsubIO.readMessagesWithAttributes()
        .fromSubscription(“<some subscription>"))
        .apply(MapElements.via(new MapMessageToBigQueryRow()))
        .apply(BigQueryIO.writeTableRows().to(tableReference)
                .withSchema(schema)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
        );
PCollection<TableRow> failedWrites = writeResult.getFailedWrites();

On Mon, Nov 20, 2017 at 7:07 AM, Carsten Krebs | GameDuell <
[email protected]> wrote:

> Hi,
>
> I’m new to Apache Beam and currently try to figure out the right way to
> deal with errors in an I/O transformation. Generally, I would like to
> filter out tuples which could not have been written for whatever reason and
> write them to some sort of “dead letter queue”.
>
> What is the right way to tag tuples, which have led to an Exception while
> writing?
>
> Currently my very simple pipeline looks like:
>
> p.apply(PubsubIO.readMessagesWithAttributes()
>         .fromSubscription(*“*<some subscription>"))
>         .apply(MapElements.via(new MapMessageToBigQueryRow()))
>         .apply(BigQueryIO.writeTableRows().to(tableReference)
>                 .withSchema(schema)
>                 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>                 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>                 
> .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
>         );
>
>
> Do I have to wrap BigQueryIO.Write instance in some kind of custom 
> transformation, catching Exceptions and tagging the input values somehow? Are 
> there better, more appropriate ways to do this?
>
> Thanks in advance,
>
> Carsten
>
>
>
>
>

Reply via email to