Ali, adding to what Matthias said:
Kafka 0.10 changed the message format to add so-called "embedded timestamps" into each Kafka message. The Java producer included in Kafka 0.10 includes such embedded timestamps into any generated message as expected. However, other clients (like the go kafka plugin you are using) may not have been updated yet to be compatible with the new 0.10 message format. That's the root cause why see these "-1" negative timestamps. (The same negative timestamp problem also happens if you attempt to read messages that were generated with pre-0.10 versions of Kafka's Java producer.) FYI: Kafka Streams' default timestamp extractor attempts to read those new embedded timestamps. If there are no such embedded timestamps, you run into these "negative timestamps" errors. Now, how to fix your problem? - Fix the root cause: Check if there's a newer version of your Go kafka plugin that generates messages in the new Kafka 0.10 format. If there is no such version, ask the maintainers for an update. :-) - Work around the problem: As Matthias said, you can also tell Kafka Streams to not use its default timestamp extractor. You can fallback to the WallclockTimestampExtractor, though this means your application will not use event-time but processing-time when processing your data, which is probably not what you want (but it does prevent the -1 timestamp errors). If your data (generated by the go kafka plugin) *does* contain timestamp information in the message payload, then the better option is to write a custom timestamp extract that inspects each message, extracts the timestamp from the payload, and returns it to Kafka Streams. The Timestamp Extractor section in [1] explains how to write a custom one and how to configure your app to use it. Hope this helps, Michael [1] http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > If you restart your application, it will resume where is left off > (same as any other Kafka consumer that does use group management and > commits offsets). > > If you want to reprocess data from scratch, you need to reset your > application using bin/kafka-streams-application-reset.sh > > See also > http://docs.confluent.io/3.0.1/streams/developer-guide.html#application- > reset-tool > > and > http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resett > ing-a-streams-application/ > > > About the timestamp issue: it seems that your Go client does not > assign valid timestamps when writing the data. As you already said, > you need to provide a custom TimestampExtractor (or you > WallclockTimestampExtractor if semantic permit) instead of default > ConsumerRecordTimestampExtractor) > > > - -Matthias > > On 10/6/16 7:53 PM, Ali Akhtar wrote: > > Thanks. > > > > I'm encountering a strange issue. > > > > If I create messages thru console-producer.sh on a new topic, > > things work fine. > > > > But on the topic that I need to consume, the messages are being > > produced via the go kafka plugin. > > > > On this topic, at first, nothing happens when the stream starts > > (i.e it doesn't process the messages which are already in there) > > > > Then, if I produce new messages, then my exception handler is > > called with the exception that timestamp is negative. > > > > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago. > > > > I'm going to write a new timestamp extractor, but any ideas why > > nothing happens with the old messages which are in the topic, it > > only responds if i push new messages to this topic? > > > > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax > > <matth...@confluent.io> wrote: > > > > Sure. > > > > Just use #print() or #writeAsText() > > > > > > -Matthias > > > > On 10/6/16 6:25 PM, Ali Akhtar wrote: > >>>> What the subject says. For dev, it would be a lot easier if > >>>> debugging info can be printed to stdin instead of another > >>>> topic, where it will persist. > >>>> > >>>> Any ideas if this is possible? > >>>> > >> > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5 > 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f > JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq > qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz > 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m > AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit > Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae > UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs > NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf > OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I > gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls > ZBA0KEjx9U8NNf+eiqN5 > =UMbs > -----END PGP SIGNATURE----- >