BigQueryIO has been written in such a way to support emitting failed
records to a "dead letter queue". Not all IO transforms support this but it
is very useful for the ones that do.
WriteResult writeResult = p.apply(PubsubIO.readMessagesWithAttributes()
.fromSubscription(“<some subscription>"))
.apply(MapElements.via(new MapMessageToBigQueryRow()))
.apply(BigQueryIO.writeTableRows().to(tableReference)
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
);
PCollection<TableRow> failedWrites = writeResult.getFailedWrites();
On Mon, Nov 20, 2017 at 7:07 AM, Carsten Krebs | GameDuell <
[email protected]> wrote:
> Hi,
>
> I’m new to Apache Beam and currently try to figure out the right way to
> deal with errors in an I/O transformation. Generally, I would like to
> filter out tuples which could not have been written for whatever reason and
> write them to some sort of “dead letter queue”.
>
> What is the right way to tag tuples, which have led to an Exception while
> writing?
>
> Currently my very simple pipeline looks like:
>
> p.apply(PubsubIO.readMessagesWithAttributes()
> .fromSubscription(*“*<some subscription>"))
> .apply(MapElements.via(new MapMessageToBigQueryRow()))
> .apply(BigQueryIO.writeTableRows().to(tableReference)
> .withSchema(schema)
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>
> .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
> );
>
>
> Do I have to wrap BigQueryIO.Write instance in some kind of custom
> transformation, catching Exceptions and tagging the input values somehow? Are
> there better, more appropriate ways to do this?
>
> Thanks in advance,
>
> Carsten
>
>
>
>
>