Hi Reuven,
How will dataflow know which particular msg triggered that exception when I
am not letting exceptions bubble up.

Regarding deadletter queues -
I am sending the msges to discard queue from my catch block but at the
moment i don't have any job to reprocess those msges and therefore I am
trying to minimize the number of msgs going to dead letter.

On Mon, Dec 20, 2021 at 10:34 AM Reuven Lax <[email protected]> wrote:

> This particular error is probably fine to swallow - I believe this error
> implies that Dataflow will be retrying the work anyway.
>
> On Mon, Dec 20, 2021 at 10:14 AM Robert Bradshaw <[email protected]>
> wrote:
>
>> I don't think there's such a generic superclass. For example, the
>> KeyTokenInvalidException in question simply extends from RuntimeError.
>>
>>
>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/KeyTokenInvalidException.java#L24
>>
>> I can see how that could be useful, though underlying libraries could
>> throw transient errors of their own so it'd be hard to track them all
>> down.
>>
>> On a related note, rather than just swallowing the errors, have you
>> considered using the dead letter pattern:
>>
>> https://cloud.google.com/architecture/building-production-ready-data-pipelines-using-dataflow-developing-and-testing#use_dead_letter_queues
>> ?
>>
>>
>>
>> On Sat, Dec 18, 2021 at 12:00 PM gaurav mishra
>> <[email protected]> wrote:
>> >
>> > Hi,
>> > I have a stateful DoFn in my pipeline. My processElement's code is
>> wrapped in try catch block.
>> > I have multiple catch blocks to handle the failures differently. The
>> last catch block is catch all exception block.
>> >
>> > try {}
>> > catch(BadDataException e) {}
>> > catch(SomeOtherCustomException e1 {}
>> > catch(Exception e2) {}
>> >
>> > The intent of the last catch all here was to avoid having scenarios
>> where pipeline stops making progress because of some unexpected
>> msgs/scenarios ( like a NPE in code) causing msges to be retried
>> indefinitely.  None of those exceptions are allowed to bubble up. either
>> msg is sent to discard topic or a msg is logged and we move on to next msg.
>> > As a consequence of this I am seeing one particular error coming from
>> internals of Dataflow when reading state spec.
>> > java.lang.RuntimeException: Unable to read value from state at
>> org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillValue.read(WindmillStateInternals.java:397)
>> > ...
>> > Caused by:
>> org.apache.beam.runners.dataflow.worker.KeyTokenInvalidException:
>> >
>> > Reading about this error it seems it is a recoverable type of error.
>> Since I don't allow any error to bubble up I believe the msgs which cause
>> that recoverable error will just be dropped and will never end up in the
>> output.
>> >  I am wondering if there is a way to distinguish between such
>> recoverable errors vs others.
>> >  Is there a super class that all such recoverable exceptions inherit
>> from? so i can do
>> > ....
>> > catch(RecoverableException e){throw e}
>> > catch(Exception e) {}
>> >
>> > This particular case I can handle by catching KeyTokenInvalidException
>> and rethrowing that but looking for a more generic solution if there is any.
>> >
>>
>

Reply via email to