Guozhang,

That worked!! Thanks a lot for your timely response. See you with another issue.

Regards,
Buvaan

-----Original Message-----
From: EXT Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Sunday, May 01, 2016 8:15 PM
To: users@kafka.apache.org
Subject: Re: Invalid TimeStamp Error while running WordCountDemo - kafka-0.10.0

Hello Buvana,

I realized that it is due to a recent change in Kafka Streams exposing an
issue in console producer that it does not set the timestamp implicitly
while using the new producer. Just filed
https://issues.apache.org/jira/browse/KAFKA-3646 for this.

As for your case, since you are building from source code, I would suggest
you just modify a one-liner in kafka.producer.BaseProducer (scala code),
line 43 as:


val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null,
System.currentTimeMillis() key, value)


to explicitly set the timestamp as current system time in milliseconds.


Guozhang



On Fri, Apr 29, 2016 at 10:34 AM, Ramanan, Buvana (Nokia - US) <
buvana.rama...@nokia.com> wrote:

> Hello Guozhang,
>
> thanks a lot for your response (to this and all of my previous questions).
> Here is how I produce to the topic:
> cat /tmp/file-input.txt | ./kafka-console-producer.sh --broker-list
> localhost:9092 --topic streams-file-input
>
> Here is the content of the file:
> ~/kafka-0.10.0/bin$ cat /tmp/file-input.txt
> all streams lead to kafka
> hello kafka streams
> join kafka summit
>
> I checked the topic and made sure there are text lines in there.
>
> As for version:
> I downloaded kafka-0.10.0 on April 22nd and built it following the
> instructions. Did not have any issues with build.
> I was able to successfully run the wordCountDemo on April 22nd
>
> Its weird that I am not able to run it now.
>
> regards,
> Buvana
> ________________________________________
> From: EXT Guozhang Wang [wangg...@gmail.com]
> Sent: Thursday, April 28, 2016 4:43 PM
> To: users@kafka.apache.org
> Subject: Re: Invalid TimeStamp Error while running WordCountDemo -
> kafka-0.10.0
>
> Hello Buvana,
>
> Could you show me the command line you used to produce the text to Kafka as
> input? Also which version of Kafka are you using for the broker?
>
> Guozhang
>
> On Wed, Apr 27, 2016 at 12:07 PM, Ramanan, Buvana (Nokia - US) <
> buvana.rama...@nokia.com> wrote:
>
> > Hello,
> >
> > I am trying to execute WordCountDemo app. I produced text to the input
> > topic. But when I execute the WordCountDemo, I get error.
> >
> > please help resolve the following:
> > ERROR Streams application error during processing in thread
> > [StreamThread-1]:
> > (org.apache.kafka.streams.processor.internals.StreamThread)
> > java.lang.IllegalArgumentException: Invalid timestamp -1
> >
> > broker, consumer & zk are running in the same machine. Ubuntu 14.04, java
> > 1.8.
> >
> > Thanks,
> > Buvana
> >
> > ~/kafka-0.10.0/bin$ ./kafka-run-class.sh
> > org.apache.kafka.streams.examples.wordcount.WordCountDemo
> > SLF4J: Class path contains multiple SLF4J bindings.
> > SLF4J: Found binding in
> >
> [jar:file:/home/buvana/kafka-0.10.0/core/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: Found binding in
> >
> [jar:file:/home/buvana/kafka-0.10.0/tools/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: Found binding in
> >
> [jar:file:/home/buvana/kafka-0.10.0/connect/api/build/dependant-libs/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: Found binding in
> >
> [jar:file:/home/buvana/kafka-0.10.0/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: Found binding in
> >
> [jar:file:/home/buvana/kafka-0.10.0/connect/file/build/dependant-libs/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: Found binding in
> >
> [jar:file:/home/buvana/kafka-0.10.0/connect/json/build/dependant-libs/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> > explanation.
> > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> > [2016-04-27 14:56:04,967] WARN The configuration replication.factor = 1
> > was supplied but isn't a known config.
> > (org.apache.kafka.clients.consumer.ConsumerConfig)
> > [2016-04-27 14:56:04,968] WARN The configuration num.standby.replicas = 0
> > was supplied but isn't a known config.
> > (org.apache.kafka.clients.consumer.ConsumerConfig)
> > [2016-04-27 14:56:04,968] WARN The configuration zookeeper.connect =
> > localhost:2181 was supplied but isn't a known config.
> > (org.apache.kafka.clients.consumer.ConsumerConfig)
> > [2016-04-27 14:56:04,968] WARN The configuration
> > __stream.thread.instance__ = Thread[StreamThread-1,5,main] was supplied
> but
> > isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> > [2016-04-27 14:56:05,736] ERROR Streams application error during
> > processing in thread [StreamThread-1]:
> > (org.apache.kafka.streams.processor.internals.StreamThread)
> > java.lang.IllegalArgumentException: Invalid timestamp -1
> >     at
> >
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
> >     at
> >
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:60)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:331)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
> >     at
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:331)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
> >     at
> >
> org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:89)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:331)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
> >     at
> >
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:331)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
> >     at
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:43)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:331)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
> >     at
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:56)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:350)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
> >
> >
> >
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to