I am not sure if https://issues.apache.org/jira/browse/KAFKA-4740 is same
issue as mine.
What I suspect may be happening is that:
  at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
  at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.common.record.Record.value(Record.java:268)
~[kafka-clients-0.10.2.0.jar:na

The size of the value does not match the actual buffer size.

Looks like the producer stored the message value size as

4 byte payload length, containing length V
V byte payload

The length does not matches the actual payload size.

Also we saw this issue for the first time, and we are running streams
application for over six months now. It happens for few messages now.

We may need to catch such exception when getting byte buffer for
keys/values and perhaps return null or something and let the high level
client handle such cases.

Thanks
Sachin



On Thu, Mar 30, 2017 at 6:59 PM, Michael Noll <mich...@confluent.io> wrote:

> Sachin,
>
> there's a JIRA that seems related to what you're seeing:
> https://issues.apache.org/jira/browse/KAFKA-4740
>
> Perhaps you could check the above and report back?
>
> -Michael
>
>
>
>
> On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mich...@confluent.io>
> wrote:
>
> > Hmm, I re-read the stacktrace again. It does look like the value-side
> > being the culprit (as Sachin suggested earlier).
> >
> > -Michael
> >
> >
> > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mich...@confluent.io>
> > wrote:
> >
> >> Sachin,
> >>
> >> you have this line:
> >>
> >> > builder.stream(Serdes.String(), serde, "advice-stream")
> >>
> >> Could the problem be that not the record values are causing the problem
> >> -- because your value deserializer does try-catch any such errors -- but
> >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> does
> >> not try-catch deserialization errors, and from a quick look at the
> source
> >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> >> error above ("Error deserializing key/value for partition..."), and the
> >> Fetcher is swallowing the more specific SerializationException of
> >> `String.Serdes()` (but it will include the original exception/Throwable
> in
> >> its own SerializationException).
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sjmit...@gmail.com>
> >> wrote:
> >>
> >>> My streams application does run in debug mode only.
> >>> Also I have checked the code around these lines
> >>>
> >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> >>> ord(Fetcher.java:867)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>
> >>> I don't see any log statement which will give me more information.
> >>>
> >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> >>>
> >>> The issue is happening at this line and perhaps handling the exception
> >>> and
> >>> setting the value to be null may be better options.
> >>> Yes at client side nothing can be done because exception is happening
> >>> before this.valueDeserializer.deserialize can be called.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <damian....@gmail.com>
> >>> wrote:
> >>>
> >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> >>> message
> >>> > has already been received into Streams.
> >>> > You could create a simple app that uses the Consumer, seeks to the
> >>> offset,
> >>> > and tries to read the message. If you did this in debug mode you
> might
> >>> find
> >>> > out some more information.
> >>> >
> >>> >
> >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sjmit...@gmail.com>
> wrote:
> >>> >
> >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> and
> >>> it
> >>> > > fails with same error.
> >>> > >
> >>> > > So was wondering if I can apply any of the suggestion as per
> >>> > >
> >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> >>> > >
> >>> > > If there is any other was just to get the contents of that message
> it
> >>> > would
> >>> > > be helpful.
> >>> > >
> >>> > > Thanks
> >>> > > Sachin
> >>> > >
> >>> > >
> >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <damian....@gmail.com>
> >>> > wrote:
> >>> > >
> >>> > > > Hi Sachin,
> >>> > > >
> >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> that
> >>> > offset
> >>> > > > on the topic and seeing what the message is? Might be easier to
> >>> debug?
> >>> > > Like
> >>> > > > you say, it is failing in the consumer.
> >>> > > > Thanks,
> >>> > > > Damian
> >>> > > >
> >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sjmit...@gmail.com>
> >>> wrote:
> >>> > > >
> >>> > > > > I think I am following the third option.
> >>> > > > >
> >>> > > > > My pipeline is:
> >>> > > > >
> >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer
> ());
> >>> > > > >
> >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> >>> > > > >   .filter(new Predicate<String, V>() { ...})
> >>> > > > >   .groupByKey()
> >>> > > > >   .aggregate(new Initializer<V1>() {...}, new
> Aggregator<String,
> >>> V,
> >>> > > V1>()
> >>> > > > > {...}, windows, supplier)
> >>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> >>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
> >>> > > > >
> >>> > > > >
> >>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
> >>> > something
> >>> > > > like
> >>> > > > > this:
> >>> > > > >
> >>> > > > >     public V deserialize(String paramString, byte[]
> >>> > paramArrayOfByte) {
> >>> > > > >         if (paramArrayOfByte == null) { return null;}
> >>> > > > >         V data = null;
> >>> > > > >         try {
> >>> > > > >             data = objectMapper.readValue(paramArrayOfByte,
> new
> >>> > > > > TypeReference<V>() {});
> >>> > > > >         } catch (Exception e) {
> >>> > > > >             e.printStackTrace();
> >>> > > > >         }
> >>> > > > >         return data;
> >>> > > > >     }
> >>> > > > >
> >>> > > > > So I am catching any exception that may happen when
> >>> deserializing the
> >>> > > > data.
> >>> > > > >
> >>> > > > > This is what third option suggest (if I am not mistaken).
> >>> > > > >
> >>> > > > > Please let me know given the pipeline we which option would be
> >>> best
> >>> > and
> >>> > > > how
> >>> > > > > can we incorporate that in our pipeline.
> >>> > > > >
> >>> > > > > Also not exception is happening when reading from source topic
> >>> which
> >>> > is
> >>> > > > > "advice-stream", so looks like flow is not going to pipeline at
> >>> all
> >>> > for
> >>> > > > us
> >>> > > > > to handle. It is terminating right at consumer poll.
> >>> > > > >
> >>> > > > > Thanks
> >>> > > > > Sachin
> >>> > > > >
> >>> > > > >
> >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
> >>> mich...@confluent.io>
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > > > Could this be a corrupted message ("poison pill") in your
> >>> topic?
> >>> > > > > >
> >>> > > > > > If so, take a look at
> >>> > > > > > http://docs.confluent.io/current/streams/faq.html#
> >>> > > > > >
> >>> > > > > handling-corrupted-records-and-deserialization-errors-
> >>> > > > poison-pill-messages
> >>> > > > > >
> >>> > > > > > FYI: We're currently investigating a more elegant way to
> >>> address
> >>> > such
> >>> > > > > > poison pill problems.  If you have feedback on that front,
> feel
> >>> > free
> >>> > > to
> >>> > > > > > share it with us. :-)
> >>> > > > > >
> >>> > > > > > -Michael
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> >>> > sjmit...@gmail.com>
> >>> > > > > > wrote:
> >>> > > > > >
> >>> > > > > > > Hi,
> >>> > > > > > > This is for first time we are getting a weird exception.
> >>> > > > > > > After this the streams caches.
> >>> > > > > > >
> >>> > > > > > > Only work around is to manually seek and commit offset to a
> >>> > greater
> >>> > > > > > number
> >>> > > > > > > and we are needing this manual intervention again and
> again.
> >>> > > > > > >
> >>> > > > > > > Any idea what is causing it and how can we circumvent this.
> >>> > > > > > >
> >>> > > > > > > Note this error happens in both cases when 10.2 client or
> >>> 10.1.1
> >>> > > > client
> >>> > > > > > > connect to kafka server 10.1.1
> >>> > > > > > >
> >>> > > > > > > So this does not looks like version issue.
> >>> > > > > > >
> >>> > > > > > > Also we have following setting
> >>> > > > > > > message.max.bytes=5000013
> >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> >>> > > > > > >
> >>> > > > > > > Rest is all default and also increasing the value for
> >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not
> >>> help.
> >>> > > > > > >
> >>> > > > > > > Stack trace below.
> >>> > > > > > >
> >>> > > > > > > Thanks
> >>> > > > > > > Sachin
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > org.apache.kafka.common.errors.SerializationException:
> Error
> >>> > > > > > deserializing
> >>> > > > > > > key/value for partition advice-stream-6 at offset 45153795
> >>> > > > > > > java.lang.IllegalArgumentException: null
> >>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
> >>> ~[na:1.8.0_122-ea]
> >>> > > > > > >   at org.apache.kafka.common.utils.
> >>> Utils.sizeDelimited(Utils.
> >>> > > > java:791)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
> >>> > java:268)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > parseRecord(Fetcher.java:867)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > parseCompletedFetch(Fetcher.java:775)
> >>> > ~[kafka-clients-0.10.2.0.jar:
> >>> > > > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > fetchedRecords(Fetcher.java:473)
> >>> ~[kafka-clients-0.10.2.0.jar:
> >>> > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>> > > > > > > pollOnce(KafkaConsumer.java:1062)
> >>> ~[kafka-clients-0.10.2.0.jar:
> >>> > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >>> > > > > > > KafkaConsumer.java:995)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> >>> > > > StreamThread.runLoop(
> >>> > > > > > > StreamThread.java:592)
> >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> >>> > > > > > > StreamThread.run(StreamThread.java:378)
> >>> > ~[kafka-streams-0.10.2.1-
> >>> > > > > > > SNAPSHOT.jar:na]
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >>
> >
> >
>

Reply via email to