Hi
unfortunatly no. Think about "caching" these records popping outta there
or multiple step Tasks (join,aggregate,repartiton all in one go) last
repartitioner might throw cause it cant determine the partition only
because a get on the join store cause a flush through the aggregates.
This has nothing todo with a ConsumerRecord at all. Especially not the
one we most recently processed.
To be completly honest. All but grining to a hold is not appealing to me
at all. Sure maybe lagmonitoring will call me on Sunday but I can at
least be confident its working the rest of the time.
Best Jan
PS.:
Hope you get my point. I am mostly complaing about
|public| |interface| |RecordExceptionHandler {|
|||/**|
|||* Inspect a record and the exception received|
|||*/|
|||HandlerResponse handle(that guy here >>>>>>>
ConsumerRecord<||byte||[], ||byte||[]> record, Exception exception);|
|}|
||
|public| |enum| |HandlerResponse {|
|||/* continue with processing */|
|||CONTINUE(||1||), |
|||/* fail the processing and stop */|
|||FAIL(||2||);|
|}|
On 26.05.2017 11:18, Eno Thereska wrote:
Thanks Jan,
The record passed to the handler will always be the problematic record. There
are 2 cases/types of exceptions for the purposes of this KIP: 1) any exception
during deserialization. The bad record + the exception (i.e.
DeserializeException) will be passed to the handler. The handler will be able
to tell this was a deserialization error.
2) any exception during processing of this record. So whenever a processor gets
the record (after some caching, etc) it starts to process it, then it fails,
then it will call the handler with this record.
Does that match your thinking?
Thanks,
Eno
On 26 May 2017, at 09:51, Jan Filipiak <jan.filip...@trivago.com> wrote:
Hi,
quick question: From the KIP it doesn't quite makes sense to me how that fits
with caching.
With caching the consumer record might not be at all related to some processor
throwing while processing.
would it not make more sense to get the ProcessorName + object object for
processing and
statestore or topic name + byte[] byte[] for serializers? maybe passing in the
used serdes?
Best Jan
On 25.05.2017 11:47, Eno Thereska wrote:
Hi there,
I’ve added a KIP on improving exception handling in streams:
KIP-161: streams record processing exception handlers.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+record+processing+exception+handlers
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+processing+exception+handlers>
Discussion and feedback is welcome, thank you.
Eno