Hi Michael, I was aware that the output should be written in a kafka topic not the console.
To understand if streams can reach the kafka as Eno asked in earlier email I found http://docs.confluent.io/3.2.0/streams/quickstart.html #goal-of-this-quickstart and went through the steps mentioned and ran /usr/bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo which works. However, running the program (e.g. https://github.com/ confluentinc/examples/blob/3.2.x/kafka-streams/src/main/ java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181) in my IDE was not and still is not working. Best regards, Mina On Wed, Mar 15, 2017 at 4:43 AM, Michael Noll <mich...@confluent.io> wrote: > Mina, > > in your original question you wrote: > > > However, I do not see the word count when I try to run below example. > Looks like that it does not connect to Kafka. > > The WordCount demo example writes its output to Kafka only -- it *does > not* write any results to the console/STDOUT. > > From what I can tell the WordCount example ran correctly because, in your > latest email, you showed the output of the console consumer (which *does* > write to the console), and that output is a list of words and counts: > > > all 1 > > lead 1 > > to 1 > > hello 1 > > streams 2 > > join 1 > > kafka 3 > > summit 1 > > In other words, I think everything you did was correct, and Kafka too was > working correctly. You were simply unaware that the WordCount example does > not write its output to the console. > > Best, > Michael > > > > > > On Wed, Mar 15, 2017 at 6:14 AM, Mina Aslani <aslanim...@gmail.com> wrote: > > > Hi, > > I just checked streams-wordcount-output topic using below command > > > > docker run \ > > > > --net=host \ > > > > --rm \ > > > > confluentinc/cp-kafka:3.2.0 \ > > > > kafka-console-consumer --bootstrap-server localhost:9092 \ > > > > --topic streams-wordcount-output \ > > > > --from-beginning \ > > > > --formatter kafka.tools.DefaultMessageFormatter \ > > > > --property print.key=true \ > > > > --property key.deserializer=org.apache.ka > > fka.common.serialization.StringDeserializer \ > > > > --property value.deserializer=org.apache. > > kafka.common.serialization.LongDeserializer > > > > > > and it returns > > > > all 1 > > lead 1 > > to 1 > > hello 1 > > streams 2 > > join 1 > > kafka 3 > > summit 1 > > > > Please note above result is when I tried http://docs.confluent.i > > o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in > > docker-machine ran /usr/bin/kafka-run-class > org.apache.kafka.streams.examp > > les.wordcount.WordCountDemo. > > > > How come running same program out of docker-machine does not output to > the > > output topic? > > Should I make the program as jar and deploy to docker-machine and run it > > using ./bin/kafka-run-class? > > > > Best regards, > > Mina > > > > > > > > On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <aslanim...@gmail.com> > > wrote: > > > > > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html# > > goal- > > > of-this-quickstart > > > > > > and in docker-machine ran /usr/bin/kafka-run-class > > > org.apache.kafka.streams.examples.wordcount.WordCountDemo > > > > > > Running > > > > > > docker run --net=host --rm confluentinc/cp-kafka:3.2.0 > > > kafka-console-consumer --bootstrap-server localhost:9092 --topic > > > streams-wordcount-output --new-consumer --from-beginning > > > > > > shows 8 blank messages > > > > > > Is there any setting/configuration should be done as running the class > in > > > the docker-machine and running program outside the docker-machine does > > not > > > return expected result! > > > > > > On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <aslanim...@gmail.com> > > wrote: > > > > > >> And the port for kafka is 29092 and for zookeeper 32181. > > >> > > >> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <aslanim...@gmail.com> > > >> wrote: > > >> > > >>> Hi, > > >>> > > >>> I forgot to add in my previous email 2 questions. > > >>> > > >>> To setup my env, shall I use https://raw.githubusercont > > >>> ent.com/confluentinc/cp-docker-images/master/examples/kafka- > > >>> single-node/docker-compose.yml instead or is there any other > > >>> docker-compose.yml (version 2 or 3) which is suggested to setup env? > > >>> > > >>> How can I check "whether streams (that is just an app) can reach > > Kafka"? > > >>> > > >>> Regards, > > >>> Mina > > >>> > > >>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com> > > >>> wrote: > > >>> > > >>>> Hi Eno, > > >>>> > > >>>> Sorry! That is a typo! > > >>>> > > >>>> I have a docker-machine with different containers (setup as > directed @ > > >>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickst > art.html) > > >>>> > > >>>> docker ps --format "{{.Image}}: {{.Names}}" > > >>>> > > >>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect > > >>>> > > >>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center > > >>>> > > >>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest > > >>>> > > >>>> confluentinc/cp-schema-registry:3.2.0: schema-registry > > >>>> > > >>>> confluentinc/cp-kafka:3.2.0: kafka > > >>>> > > >>>> confluentinc/cp-zookeeper:3.2.0: zookeeper > > >>>> > > >>>> I used example @ https://github.com/confluent > > >>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl > > >>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and > > >>>> followed the same steps. > > >>>> > > >>>> When I run below command in docker-machine, I see the messages in > > >>>> TextLinesTopic. > > >>>> > > >>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 > > kafka-console-consumer > > >>>> --bootstrap-server localhost:29092 --topic TextLinesTopic > > --new-consumer > > >>>> --from-beginning > > >>>> > > >>>> hello kafka streams > > >>>> > > >>>> all streams lead to kafka > > >>>> > > >>>> join kafka summit > > >>>> > > >>>> test1 > > >>>> > > >>>> test2 > > >>>> > > >>>> test3 > > >>>> > > >>>> test4 > > >>>> > > >>>> Running above command for WordsWithCountsTopic returns nothing*.* > > >>>> > > >>>> My program runs out of docker machine, and it does not return any > > >>>> error. > > >>>> > > >>>> I checked kafka logs and kafka-connect logs, no information is > shown. > > >>>> Wondering what is the log level in kafka/kafka-connect. > > >>>> > > >>>> > > >>>> Best regards, > > >>>> Mina > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska < > eno.there...@gmail.com > > > > > >>>> wrote: > > >>>> > > >>>>> Hi there, > > >>>>> > > >>>>> I noticed in your example that you are using localhost:9092 to > > produce > > >>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, > > kafka, and > > >>>>> the Kafka Streams app all running within one docker container, or > in > > >>>>> different containers? > > >>>>> > > >>>>> I just tested the WordCountLambdaExample and it works for me. This > > >>>>> might not have anything to do with streams, but rather with the > Kafka > > >>>>> configuration and whether streams (that is just an app) can reach > > Kafka at > > >>>>> all. If you provide the above information we can look further. > > >>>>> > > >>>>> > > >>>>> > > >>>>> Thanks > > >>>>> Eno > > >>>>> > > >>>>> > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> > > wrote: > > >>>>> > > > >>>>> > I reset and still not working! > > >>>>> > > > >>>>> > My env is setup using > > >>>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/ > > quickstart.html > > >>>>> > > > >>>>> > I just tried using > > >>>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st > > >>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL > > >>>>> ambdaExample.java#L178-L181 > > >>>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) > > >>>>> created > > >>>>> > from scratch as went through the steps as directed. > > >>>>> > > > >>>>> > When I stopped the java program and check the topics below are > the > > >>>>> data in > > >>>>> > each topic. > > >>>>> > > > >>>>> > docker run \ > > >>>>> > > > >>>>> > --net=host \ > > >>>>> > > > >>>>> > --rm \ > > >>>>> > > > >>>>> > confluentinc/cp-kafka:3.2.0 \ > > >>>>> > > > >>>>> > kafka-console-consumer --bootstrap-server localhost:29092 > --topic > > >>>>> > TextLinesTopic --new-consumer --from-beginning > > >>>>> > > > >>>>> > > > >>>>> > SHOWS > > >>>>> > > > >>>>> > hello kafka streams > > >>>>> > > > >>>>> > all streams lead to kafka > > >>>>> > > > >>>>> > join kafka summit > > >>>>> > > > >>>>> > test1 > > >>>>> > > > >>>>> > test2 > > >>>>> > > > >>>>> > test3 > > >>>>> > > > >>>>> > test4 > > >>>>> > > > >>>>> > FOR WordsWithCountsTopic nothing is shown > > >>>>> > > > >>>>> > > > >>>>> > I am new to the Kafka/Kafka Stream and still do not understand > why > > a > > >>>>> simple > > >>>>> > example does not work! > > >>>>> > > > >>>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax < > > >>>>> matth...@confluent.io> > > >>>>> > wrote: > > >>>>> > > > >>>>> >>>> So, when I check the number of messages in wordCount-input I > see > > >>>>> the > > >>>>> >> same > > >>>>> >>>> messages. However, when I run below code I do not see any > > >>>>> message/data > > >>>>> >> in > > >>>>> >>>> wordCount-output. > > >>>>> >> > > >>>>> >> Did you reset your application? > > >>>>> >> > > >>>>> >> Each time you run you app and restart it, it will resume > > processing > > >>>>> >> where it left off. Thus, if something went wrong in you first > run > > >>>>> but > > >>>>> >> you got committed offsets, the app will not re-read the whole > > topic. > > >>>>> >> > > >>>>> >> You can check committed offset via bin/kafka-consumer-groups.sh. > > The > > >>>>> >> application-id from StreamConfig is used a group.id. > > >>>>> >> > > >>>>> >> Thus, resetting you app would be required to consumer the input > > >>>>> topic > > >>>>> >> from scratch. Of you just write new data to you input topic. > > >>>>> >> > > >>>>> >>>> Can I connect to kafka in VM/docker container using below code > > or > > >>>>> do I > > >>>>> >> need > > >>>>> >>>> to change/add other parameters? How can I submit the code to > > >>>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to > > >>>>> submit the > > >>>>> >>>> code(e.g. jar file)? > > >>>>> >> > > >>>>> >> A Streams app is a regular Java application and can run anywhere > > -- > > >>>>> >> there is no notion of a processing cluster and you don't > "submit" > > >>>>> your > > >>>>> >> code -- you just run your app. > > >>>>> >> > > >>>>> >> Thus, if your console consumer can connect to the cluster, your > > >>>>> Streams > > >>>>> >> app should also be able to connect to the cluster. > > >>>>> >> > > >>>>> >> > > >>>>> >> Maybe, the short runtime of 5 seconds could be a problem (even > if > > it > > >>>>> >> seems log to process just a few records). But you might need to > > put > > >>>>> >> startup delay into account. I would recommend to register a > > shutdown > > >>>>> >> hook: see > > >>>>> >> https://github.com/confluentinc/examples/blob/3. > > >>>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ > > >>>>> >> WordCountLambdaExample.java#L178-L181 > > >>>>> >> > > >>>>> >> > > >>>>> >> Hope this helps. > > >>>>> >> > > >>>>> >> -Matthias > > >>>>> >> > > >>>>> >> > > >>>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote: > > >>>>> >>> Hi Matthias, > > >>>>> >>> > > >>>>> >>> Thank you for the quick response, appreciate it! > > >>>>> >>> > > >>>>> >>> I created the topics wordCount-input and wordCount-output. > Pushed > > >>>>> some > > >>>>> >> data > > >>>>> >>> to wordCount-input using > > >>>>> >>> > > >>>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format > > >>>>> "{{.Names}}") > > >>>>> >>> /bin/kafka-console-producer --broker-list localhost:9092 > --topic > > >>>>> >>> wordCount-input > > >>>>> >>> > > >>>>> >>> test > > >>>>> >>> > > >>>>> >>> new > > >>>>> >>> > > >>>>> >>> word > > >>>>> >>> > > >>>>> >>> count > > >>>>> >>> > > >>>>> >>> wordcount > > >>>>> >>> > > >>>>> >>> word count > > >>>>> >>> > > >>>>> >>> So, when I check the number of messages in wordCount-input I > see > > >>>>> the same > > >>>>> >>> messages. However, when I run below code I do not see any > > >>>>> message/data in > > >>>>> >>> wordCount-output. > > >>>>> >>> > > >>>>> >>> Can I connect to kafka in VM/docker container using below code > or > > >>>>> do I > > >>>>> >> need > > >>>>> >>> to change/add other parameters? How can I submit the code to > > >>>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to > > submit > > >>>>> the > > >>>>> >>> code(e.g. jar file)? > > >>>>> >>> > > >>>>> >>> I really appreciate your input as I am blocked and cannot run > > even > > >>>>> below > > >>>>> >>> simple example. > > >>>>> >>> > > >>>>> >>> Best regards, > > >>>>> >>> Mina > > >>>>> >>> > > >>>>> >>> I changed the code to be as below: > > >>>>> >>> > > >>>>> >>> Properties props = new Properties(); > > >>>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, > > >>>>> "wordCount-streaming"); > > >>>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > >>>>> "<ipAddress>:9092"); > > >>>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > >>>>> >>> Serdes.String().getClass().getName()); > > >>>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > >>>>> >>> Serdes.String().getClass().getName()); > > >>>>> >>> > > >>>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); > > >>>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > > >>>>> >>> > > >>>>> >>> // setting offset reset to earliest so that we can re-run the > > demo > > >>>>> >>> code with the same pre-loaded data > > >>>>> >>> // Note: To re-run the demo, you need to use the offset reset > > tool: > > >>>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/ > > >>>>> >> Kafka+Streams+Application+Reset+Tool > > >>>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "earliest"); > > >>>>> >>> > > >>>>> >>> KStreamBuilder builder = new KStreamBuilder(); > > >>>>> >>> > > >>>>> >>> KStream<String, String> source = builder.stream("wordCount-inpu > > >>>>> t"); > > >>>>> >>> > > >>>>> >>> KTable<String, Long> counts = source > > >>>>> >>> .flatMapValues(new ValueMapper<String, > Iterable<String>>() { > > >>>>> >>> @Override > > >>>>> >>> public Iterable<String> apply(String value) { > > >>>>> >>> return > > >>>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" > > ")); > > >>>>> >>> } > > >>>>> >>> }).map(new KeyValueMapper<String, String, KeyValue<String, > > >>>>> >> String>>() { > > >>>>> >>> @Override > > >>>>> >>> public KeyValue<String, String> apply(String key, > String > > >>>>> value) > > >>>>> >> { > > >>>>> >>> return new KeyValue<>(value, value); > > >>>>> >>> } > > >>>>> >>> }) > > >>>>> >>> .groupByKey() > > >>>>> >>> .count("Counts"); > > >>>>> >>> > > >>>>> >>> // need to override value serde to Long type > > >>>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); > > >>>>> >>> > > >>>>> >>> KafkaStreams streams = new KafkaStreams(builder, props); > > >>>>> >>> streams.start(); > > >>>>> >>> > > >>>>> >>> // usually the stream application would be running forever, > > >>>>> >>> // in this example we just let it run for some time and stop > > since > > >>>>> the > > >>>>> >>> input data is finite. > > >>>>> >>> Thread.sleep(5000L); > > >>>>> >>> > > >>>>> >>> streams.close(); > > >>>>> >>> > > >>>>> >>> > > >>>>> >>> > > >>>>> >>> > > >>>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax < > > >>>>> matth...@confluent.io> > > >>>>> >>> wrote: > > >>>>> >>> > > >>>>> >>>> Maybe you need to reset your application using the reset tool: > > >>>>> >>>> http://docs.confluent.io/current/streams/developer- > > >>>>> >>>> guide.html#application-reset-tool > > >>>>> >>>> > > >>>>> >>>> Also keep in mind, that KTables buffer internally, and thus, > you > > >>>>> might > > >>>>> >>>> only see data on commit. > > >>>>> >>>> > > >>>>> >>>> Try to reduce commit interval or disable caching by setting > > >>>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig. > > >>>>> >>>> > > >>>>> >>>> > > >>>>> >>>> -Matthias > > >>>>> >>>> > > >>>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote: > > >>>>> >>>>> Hi, > > >>>>> >>>>> > > >>>>> >>>>> This is the first time that am using Kafka Stream. I would > like > > >>>>> to read > > >>>>> >>>>> from input topic and write to output topic. However, I do not > > >>>>> see the > > >>>>> >>>> word > > >>>>> >>>>> count when I try to run below example. Looks like that it > does > > >>>>> not > > >>>>> >>>> connect > > >>>>> >>>>> to Kafka. I do not see any error though. I tried my localhost > > >>>>> kafka as > > >>>>> >>>> well > > >>>>> >>>>> as the container in a VM, same situation. > > >>>>> >>>>> > > >>>>> >>>>> There are over 200 message in the input kafka topic. > > >>>>> >>>>> > > >>>>> >>>>> Your input is appreciated! > > >>>>> >>>>> > > >>>>> >>>>> Best regards, > > >>>>> >>>>> Mina > > >>>>> >>>>> > > >>>>> >>>>> import org.apache.kafka.common.serialization.*; > > >>>>> >>>>> import org.apache.kafka.streams.*; > > >>>>> >>>>> import org.apache.kafka.streams.kstream.*; > > >>>>> >>>>> > > >>>>> >>>>> import java.util.*; > > >>>>> >>>>> import java.util.regex.*; > > >>>>> >>>>> > > >>>>> >>>>> public class WordCountExample { > > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> public static void main(String [] args) { > > >>>>> >>>>> final Properties streamsConfiguration = new > Properties(); > > >>>>> >>>>> streamsConfiguration.put(Strea > > >>>>> msConfig.APPLICATION_ID_CONFIG, > > >>>>> >>>>> "wordcount-streaming"); > > >>>>> >>>>> streamsConfiguration.put(Strea > > >>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG, > > >>>>> >>>>> "<IPADDRESS>:9092"); > > >>>>> >>>>> streamsConfiguration.put(Strea > > >>>>> msConfig.KEY_SERDE_CLASS_CONFIG, > > >>>>> >>>>> Serdes.String().getClass().getName()); > > >>>>> >>>>> streamsConfiguration.put(Strea > > >>>>> msConfig.VALUE_SERDE_CLASS_CONFIG, > > >>>>> >>>>> Serdes.String().getClass().getName()); > > >>>>> >>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ > > >>>>> >> MS_CONFIG, > > >>>>> >>>>> 10 * 1000); > > >>>>> >>>>> > > >>>>> >>>>> final Serde<String> stringSerde = Serdes.String(); > > >>>>> >>>>> final Serde<Long> longSerde = Serdes.Long(); > > >>>>> >>>>> > > >>>>> >>>>> final KStreamBuilder builder = new KStreamBuilder(); > > >>>>> >>>>> > > >>>>> >>>>> final KStream<String, String> textLines = > > >>>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input"); > > >>>>> >>>>> > > >>>>> >>>>> final Pattern pattern = Pattern.compile("\\W+", > > >>>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS); > > >>>>> >>>>> > > >>>>> >>>>> final KStream<String, Long> wordCounts = textLines > > >>>>> >>>>> .flatMapValues(value -> > > >>>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase()))) > > >>>>> >>>>> .groupBy((key, word) -> word) > > >>>>> >>>>> .count("Counts") > > >>>>> >>>>> .toStream(); > > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> wordCounts.to(stringSerde, longSerde, > "wordcount-output"); > > >>>>> >>>>> > > >>>>> >>>>> final KafkaStreams streams = new KafkaStreams(builder, > > >>>>> >>>>> streamsConfiguration); > > >>>>> >>>>> streams.cleanUp(); > > >>>>> >>>>> streams.start(); > > >>>>> >>>>> > > >>>>> >>>>> Runtime.getRuntime().addShutdownHook(new > > >>>>> >>>> Thread(streams::close)); } > > >>>>> >>>>> } > > >>>>> >>>>> > > >>>>> >>>> > > >>>>> >>>> > > >>>>> >>> > > >>>>> >> > > >>>>> >> > > >>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > > > > > > -- > *Michael G. Noll* > Product Manager | Confluent > +1 650 453 5860 | @miguno <https://twitter.com/miguno> > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog > <http://www.confluent.io/blog> >