Hi Reuven, How will dataflow know which particular msg triggered that exception when I am not letting exceptions bubble up.
Regarding deadletter queues - I am sending the msges to discard queue from my catch block but at the moment i don't have any job to reprocess those msges and therefore I am trying to minimize the number of msgs going to dead letter. On Mon, Dec 20, 2021 at 10:34 AM Reuven Lax <[email protected]> wrote: > This particular error is probably fine to swallow - I believe this error > implies that Dataflow will be retrying the work anyway. > > On Mon, Dec 20, 2021 at 10:14 AM Robert Bradshaw <[email protected]> > wrote: > >> I don't think there's such a generic superclass. For example, the >> KeyTokenInvalidException in question simply extends from RuntimeError. >> >> >> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/KeyTokenInvalidException.java#L24 >> >> I can see how that could be useful, though underlying libraries could >> throw transient errors of their own so it'd be hard to track them all >> down. >> >> On a related note, rather than just swallowing the errors, have you >> considered using the dead letter pattern: >> >> https://cloud.google.com/architecture/building-production-ready-data-pipelines-using-dataflow-developing-and-testing#use_dead_letter_queues >> ? >> >> >> >> On Sat, Dec 18, 2021 at 12:00 PM gaurav mishra >> <[email protected]> wrote: >> > >> > Hi, >> > I have a stateful DoFn in my pipeline. My processElement's code is >> wrapped in try catch block. >> > I have multiple catch blocks to handle the failures differently. The >> last catch block is catch all exception block. >> > >> > try {} >> > catch(BadDataException e) {} >> > catch(SomeOtherCustomException e1 {} >> > catch(Exception e2) {} >> > >> > The intent of the last catch all here was to avoid having scenarios >> where pipeline stops making progress because of some unexpected >> msgs/scenarios ( like a NPE in code) causing msges to be retried >> indefinitely. None of those exceptions are allowed to bubble up. either >> msg is sent to discard topic or a msg is logged and we move on to next msg. >> > As a consequence of this I am seeing one particular error coming from >> internals of Dataflow when reading state spec. >> > java.lang.RuntimeException: Unable to read value from state at >> org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillValue.read(WindmillStateInternals.java:397) >> > ... >> > Caused by: >> org.apache.beam.runners.dataflow.worker.KeyTokenInvalidException: >> > >> > Reading about this error it seems it is a recoverable type of error. >> Since I don't allow any error to bubble up I believe the msgs which cause >> that recoverable error will just be dropped and will never end up in the >> output. >> > I am wondering if there is a way to distinguish between such >> recoverable errors vs others. >> > Is there a super class that all such recoverable exceptions inherit >> from? so i can do >> > .... >> > catch(RecoverableException e){throw e} >> > catch(Exception e) {} >> > >> > This particular case I can handle by catching KeyTokenInvalidException >> and rethrowing that but looking for a more generic solution if there is any. >> > >> >
