usubscribe
Re: Kafka stream specify key for message
.to() method is called on KStream, like KStream.to(outputTopic, Produced ), See sample code below- KStream inputStream = builder.stream("input-topic"); KStream outputStream = inputStream.map(new KeyValueMapper> { KeyValue apply(String key, String value) { return new KeyValue<>(key.toUpperCase(), value.split(" ").length); } }); KStream contains And key values derived from key of input-topic. Is this code key is upper-case-value for input-topic keys. You don't have to set values of the key in Produced class, it needs the type/serde of key , eg- byteArraySerde. On Thu, Jun 14, 2018 at 9:14 AM pradeep s wrote: > Hi, > In kafka stream, when we use *to *method for sending values to a topic, is > there a way to mention the message key . > > .to(outputTopic, Produced.with(byteArraySerde, itemEnvelopeSerde)); > > In Produced class , i cant find a way to set the key. > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/Produced.html > > Thanks > > Pradeep >
Re: Kafka mirror maker help
You should share related info, such source-destination Kafka versions, sample Config or error if any. FYI, Go through https://kafka.apache.org/documentation/#basic_ops_mirror_maker
Re: Need help in kafka hdfs connector
Try this : https://github.com/pinterest/secor On Thu, Mar 29, 2018 at 2:36 PM, Santosh Kumar J P < santoshkumar...@gmail.com> wrote: > Hi, > > Do we have any other Kafka HDFS connectors implementation other than > Confluent HDFS connector. > > Thank you, > Regards, > Santosh >
Kafka topic monitoring
Hi All, I'm looking for a open source tool which can help in monitoring topic level, partition level, consumer group level historycal data. use case: 1. Finding when consumer lag started on which partition on yesterdays data. 2. messages in-rate in each partition. https://github.com/yahoo/kafka-manager helpfull in many cases, but this provide data for last15 min only. https://github.com/yahoo/kafka-manager/blob/master/img/broker.png Regards Amrit
Re: Viewing timestamps with console consumer (0.10.1.1)
Hi Meghana, Please Try : kafka-console-consumer.sh --property print.timestamp=true - Amrit On Mon, Jan 30, 2017 at 10:20 PM, Meghana Narasimhan < mnarasim...@bandwidth.com> wrote: > Hi, > Is there a way to view message timestamp using console consumer ? > > Thanks, > Meghana >
Error in kafka-stream example
Hi All, I want to try out kafka stream example using this example : https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala Getting exception while compiling code : *[error] val uppercasedWithMapValues: KStream[Array[Byte], String] = textLines.mapValues(x => makeCaps(x))* *[error] ^* *[error] /home/ubuntu/stream/MapFunctionScalaExample.scala:32: missing parameter type* *[error] val uppercasedWithMap: KStream[Array[Byte], String] = textLines.map((key, value) => (key, value.toUpperCase()))* *[error] ^* Is this example working for anyone, Has anyone tried it out? This is my sbt file : name := "Kafka_Stream_Test" organization := "com.goibibo" version := "0.1" scalaVersion := "2.11.0" javacOptions ++= Seq("-source", "1.8", "-target", "1.8") resolvers ++= Seq( "confluent-repository" at "http://packages.confluent.io/maven/; ) libraryDependencies ++= Seq( "org.apache.kafka" % "kafka-streams" % "0.10.1.0-cp2", "org.apache.kafka" % "kafka-clients" % "0.10.1.0-cp2" ) Is there anything i'm missing here ? Regards, Amrit