Ali,

adding to what Matthias said:

Kafka 0.10 changed the message format to add so-called "embedded
timestamps" into each Kafka message.  The Java producer included in Kafka
0.10 includes such embedded timestamps into any generated message as
expected.

However, other clients (like the go kafka plugin you are using) may not
have been updated yet to be compatible with the new 0.10 message format.
That's the root cause why see these "-1" negative timestamps.   (The same
negative timestamp problem also happens if you attempt to read messages
that were generated with pre-0.10 versions of Kafka's Java producer.)

FYI: Kafka Streams' default timestamp extractor attempts to read those new
embedded timestamps.  If there are no such embedded timestamps, you run
into these "negative timestamps" errors.

Now, how to fix your problem?

- Fix the root cause: Check if there's a newer version of your Go kafka
plugin that generates messages in the new Kafka 0.10 format.  If there is
no such version, ask the maintainers for an update. :-)

- Work around the problem:  As Matthias said, you can also tell Kafka
Streams to not use its default timestamp extractor.  You can fallback to
the WallclockTimestampExtractor, though this means your application will
not use event-time but processing-time when processing your data, which is
probably not what you want (but it does prevent the -1 timestamp errors).
If your data (generated by the go kafka plugin) *does* contain timestamp
information in the message payload, then the better option is to write a
custom timestamp extract that inspects each message, extracts the timestamp
from the payload, and returns it to Kafka Streams.  The Timestamp Extractor
section in [1] explains how to write a custom one and how to configure your
app to use it.

Hope this helps,
Michael



[1]
http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters






On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> If you restart your application, it will resume where is left off
> (same as any other Kafka consumer that does use group management and
> commits offsets).
>
> If you want to reprocess data from scratch, you need to reset your
> application using bin/kafka-streams-application-reset.sh
>
> See also
> http://docs.confluent.io/3.0.1/streams/developer-guide.html#application-
> reset-tool
>
> and
> http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resett
> ing-a-streams-application/
>
>
> About the timestamp issue: it seems that your Go client does not
> assign valid timestamps when writing the data. As you already said,
> you need to provide a custom TimestampExtractor (or you
> WallclockTimestampExtractor if semantic permit) instead of default
> ConsumerRecordTimestampExtractor)
>
>
> - -Matthias
>
> On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > Thanks.
> >
> > I'm encountering a strange issue.
> >
> > If I create messages thru console-producer.sh on a new topic,
> > things work fine.
> >
> > But on the topic that I need to consume, the messages are being
> > produced via the go kafka plugin.
> >
> > On this topic, at first, nothing happens when the stream starts
> > (i.e it doesn't process the messages which are already in there)
> >
> > Then, if I produce new messages, then my exception handler is
> > called with the exception that timestamp is negative.
> >
> > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> >
> > I'm going to write a new timestamp extractor, but any ideas why
> > nothing happens with the old messages which are in the topic, it
> > only responds if i push new messages to this topic?
> >
> > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> > <matth...@confluent.io> wrote:
> >
> > Sure.
> >
> > Just use #print() or #writeAsText()
> >
> >
> > -Matthias
> >
> > On 10/6/16 6:25 PM, Ali Akhtar wrote:
> >>>> What the subject says. For dev, it would be a lot easier if
> >>>> debugging info can be printed to stdin instead of another
> >>>> topic, where it will persist.
> >>>>
> >>>> Any ideas if this is possible?
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
> 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
> JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
> qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
> 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
> AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
> Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
> UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
> NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
> OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
> gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
> ZBA0KEjx9U8NNf+eiqN5
> =UMbs
> -----END PGP SIGNATURE-----
>

Reply via email to