Note that this works only for streaming inserts, because with failed batch
loads it is not possible to isolate which individual writes failed.

On Mon, Nov 20, 2017 at 10:01 AM Lukasz Cwik <[email protected]> wrote:

> BigQueryIO has been written in such a way to support emitting failed
> records to a "dead letter queue". Not all IO transforms support this but it
> is very useful for the ones that do.
>
> WriteResult writeResult = p.apply(PubsubIO.readMessagesWithAttributes()
>         .fromSubscription(“<some subscription>"))
>         .apply(MapElements.via(new MapMessageToBigQueryRow()))
>         .apply(BigQueryIO.writeTableRows().to(tableReference)
>                 .withSchema(schema)
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>
> .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
>         );
> PCollection<TableRow> failedWrites = writeResult.getFailedWrites();
>
> On Mon, Nov 20, 2017 at 7:07 AM, Carsten Krebs | GameDuell <
> [email protected]> wrote:
>
>> Hi,
>>
>> I’m new to Apache Beam and currently try to figure out the right way to
>> deal with errors in an I/O transformation. Generally, I would like to
>> filter out tuples which could not have been written for whatever reason and
>> write them to some sort of “dead letter queue”.
>>
>> What is the right way to tag tuples, which have led to an Exception while
>> writing?
>>
>> Currently my very simple pipeline looks like:
>>
>> p.apply(PubsubIO.readMessagesWithAttributes()
>>         .fromSubscription(*“*<some subscription>"))
>>         .apply(MapElements.via(new MapMessageToBigQueryRow()))
>>         .apply(BigQueryIO.writeTableRows().to(tableReference)
>>                 .withSchema(schema)
>>                 
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>                 
>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>                 
>> .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
>>         );
>>
>>
>> Do I have to wrap BigQueryIO.Write instance in some kind of custom 
>> transformation, catching Exceptions and tagging the input values somehow? 
>> Are there better, more appropriate ways to do this?
>>
>> Thanks in advance,
>>
>> Carsten
>>
>>
>>
>>
>>
>

Reply via email to