Your code for setting the handler seems right to me.

Another double checking: have you turned on DEBUG level metrics recording
in order for this metric? Note skippedDueToDeserializationError is recorded
as DEBUG level so you need to set metrics.recording.level accordingly
(default is INFO). Lower level metrics than this config will not be
recorded nor reported.

Also note that turning on DEBUG level metrics recording has an impact on
your application's performance, since it is mostly for finer granularity
(per processor node, per task, etc) and hence recording overhead is higher
than those INFO metrics which are for global thread-level sensors.


Guozhang


On Tue, Jan 30, 2018 at 4:23 AM, Srikanth <srikanth...@gmail.com> wrote:

> Guozhang,
>
> Here is the snippet.
>
> private Properties getProperties() {
>     Properties p = new Properties();
>     ...
>     p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, kafkaConfig.getString("
> streamThreads"));
>     p.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_
> HANDLER_CLASS_CONFIG,
> LogAndContinueExceptionHandler.class);
>     ...
>     return p;
>   }
>
> StreamsConfig streamsConfig = new StreamsConfig(getProperties())
> KafkaStreams kafkaStreams = new KafkaStreams(streamBuilder.
> build(),streamsConfig);
>
> Srikanth
>
> On Mon, Jan 29, 2018 at 11:10 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Hi Srikanth,
> >
> > How did you set the LogAndContinueExceptionHandler in the configs? Could
> > you copy the code snippet here?
> >
> > Guozhang
> >
> >
> > On Sun, Jan 28, 2018 at 11:26 PM, Srikanth <srikanth...@gmail.com>
> wrote:
> >
> > > Kafka-streams version "1.0.0".
> > >
> > > Thanks,
> > > Srikanth
> > >
> > > On Mon, Jan 29, 2018 at 12:23 AM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > >
> > > > Hello Srikanth,
> > > >
> > > > Which version of Kafka are you using? I'd like to dig for that
> > particular
> > > > branch again.
> > > >
> > > > Guozhang
> > > >
> > > > On Sun, Jan 28, 2018 at 8:54 AM, Srikanth <srikanth...@gmail.com>
> > wrote:
> > > >
> > > > > Guozhang,
> > > > >
> > > > > While I understand that this metric is meaningless when handler is
> > set
> > > to
> > > > > FAIL, in my case I'm actually using LogAndContinueExceptionHandler
> .
> > > > > In this case, app needs to report such occurrences. What I noticed
> is
> > > > that
> > > > > only skipped-records is set.
> > > > > The granularity offered by skippedDueToDeserializationError is
> > > missing.
> > > > >
> > > > > Srikanth
> > > > >
> > > > > On Fri, Jan 26, 2018 at 10:45 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Srikanth,
> > > > > >
> > > > > > Looked at the source code once again and discussing with other
> > > > committer
> > > > > I
> > > > > > now remembered why we designed it that way: when you set the
> > > > > > HandlerResponse to FAIL, it means that once a "poison record" is
> > > > > received,
> > > > > > stop the world by throwing this exception all the way up. And
> hence
> > > at
> > > > > that
> > > > > > time the application would be stopped anyways so we would not
> need
> > to
> > > > > > record this metric.
> > > > > >
> > > > > > So in other words, I think it is rather a documentation
> improvement
> > > > that
> > > > > we
> > > > > > should do.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 26, 2018 at 8:56 AM, Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Helo Srikanth,
> > > > > > >
> > > > > > > Thanks for reporting this, as I checked the code
> > > > > > skippedDueToDeserializati
> > > > > > > onError is effectively only recorded when the
> > > > > DeserializationHandlerResp
> > > > > > > onse is not set to FAIL. I agree it is not exactly matching the
> > > > > > > documentation's guidance, and will try to file a JIRA and fix
> it.
> > > > > > >
> > > > > > > As for skippedDueToDeserializationError and
> > skipped-records-rate,
> > > > > there
> > > > > > > is an open JIRA discussing about this: https://issues.apache.
> > > > > > > org/jira/browse/KAFKA-6376, just FYI.
> > > > > > >
> > > > > > >
> > > > > > > Could you share on which version of Kafka did you observe this
> > > issue?
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Fri, Jan 26, 2018 at 6:30 AM, Srikanth <
> srikanth...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Hello,
> > > > > > >>
> > > > > > >> As per doc when LogAndContinueExceptionHandler is used it will
> > set
> > > > > > >> skippedDueToDeserializationError-rate metric to indicate
> > > > > > deserialization
> > > > > > >> error.
> > > > > > >> I notice that it is never set. Instead skipped-records-rate is
> > > set.
> > > > My
> > > > > > >> understanding was that skipped-records-rate is set due to
> > > timestamp
> > > > > > >> extraction errors.
> > > > > > >>
> > > > > > >> Ex, I sent a few invalid records to a topic and was able to
> see
> > > > > > exception
> > > > > > >> during deserialization.
> > > > > > >>
> > > > > > >> org.apache.kafka.common.errors.SerializationException: Error
> > > > > > >> deserializing
> > > > > > >> Avro message for id -1
> > > > > > >> Caused by: org.apache.kafka.common.
> > errors.SerializationException:
> > > > > > Unknown
> > > > > > >> magic byte!
> > > > > > >> 18/01/24 06:50:09 WARN StreamThread: Exception caught during
> > > > > > >> Deserialization, taskId: 0_0, topic: docker.event.1,
> partition:
> > 0,
> > > > > > offset:
> > > > > > >> 3764
> > > > > > >>
> > > > > > >> These incremented skipped-records-[rate|total].
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Srikanth
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to