Mina,

in your original question you wrote:

> However, I do not see the word count when I try to run below example.
Looks like that it does not connect to Kafka.

The WordCount demo example writes its output to Kafka only --  it *does
not* write any results to the console/STDOUT.

>From what I can tell the WordCount example ran correctly because, in your
latest email, you showed the output of the console consumer (which *does*
write to the console), and that output is a list of words and counts:

> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1

In other words, I think everything you did was correct, and Kafka too was
working correctly.  You were simply unaware that the WordCount example does
not write its output to the console.

Best,
Michael





On Wed, Mar 15, 2017 at 6:14 AM, Mina Aslani <aslanim...@gmail.com> wrote:

> Hi,
> I just checked streams-wordcount-output topic using below command
>
> docker run \
>
>   --net=host \
>
>   --rm \
>
>   confluentinc/cp-kafka:3.2.0 \
>
>   kafka-console-consumer --bootstrap-server localhost:9092 \
>
>           --topic streams-wordcount-output \
>
>           --from-beginning \
>
>           --formatter kafka.tools.DefaultMessageFormatter \
>
>           --property print.key=true \
>
>           --property key.deserializer=org.apache.ka
> fka.common.serialization.StringDeserializer \
>
>           --property value.deserializer=org.apache.
> kafka.common.serialization.LongDeserializer
>
>
> and it returns
>
> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1
>
> Please note above result is when I tried  http://docs.confluent.i
> o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> docker-machine  ran /usr/bin/kafka-run-class org.apache.kafka.streams.examp
> les.wordcount.WordCountDemo.
>
> How come running same program out of docker-machine does not output to the
> output topic?
> Should I make the program as jar and deploy to docker-machine and run it
> using ./bin/kafka-run-class?
>
> Best regards,
> Mina
>
>
>
> On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <aslanim...@gmail.com>
> wrote:
>
> > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> goal-
> > of-this-quickstart
> >
> > and in docker-machine  ran /usr/bin/kafka-run-class
> > org.apache.kafka.streams.examples.wordcount.WordCountDemo
> >
> > Running
> >
> > docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> > kafka-console-consumer --bootstrap-server localhost:9092 --topic
> > streams-wordcount-output --new-consumer --from-beginning
> >
> > shows 8 blank messages
> >
> > Is there any setting/configuration should be done as running the class in
> > the docker-machine and running program outside the docker-machine does
> not
> > return expected result!
> >
> > On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <aslanim...@gmail.com>
> wrote:
> >
> >> And the port for kafka is 29092 and for zookeeper 32181.
> >>
> >> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <aslanim...@gmail.com>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I forgot to add in my previous email 2 questions.
> >>>
> >>> To setup my env, shall I use https://raw.githubusercont
> >>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
> >>> single-node/docker-compose.yml instead or is there any other
> >>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
> >>>
> >>> How can I check "whether streams (that is just an app) can reach
> Kafka"?
> >>>
> >>> Regards,
> >>> Mina
> >>>
> >>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Eno,
> >>>>
> >>>> Sorry! That is a typo!
> >>>>
> >>>> I have a docker-machine with different containers (setup as directed @
> >>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
> >>>>
> >>>> docker ps --format "{{.Image}}: {{.Names}}"
> >>>>
> >>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> >>>>
> >>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
> >>>>
> >>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
> >>>>
> >>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
> >>>>
> >>>> confluentinc/cp-kafka:3.2.0: kafka
> >>>>
> >>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
> >>>>
> >>>> I used example @ https://github.com/confluent
> >>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
> >>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
> >>>> followed the same steps.
> >>>>
> >>>> When I run below command in docker-machine, I see the messages in
> >>>> TextLinesTopic.
> >>>>
> >>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> kafka-console-consumer
> >>>> --bootstrap-server localhost:29092 --topic TextLinesTopic
> --new-consumer
> >>>> --from-beginning
> >>>>
> >>>> hello kafka streams
> >>>>
> >>>> all streams lead to kafka
> >>>>
> >>>> join kafka summit
> >>>>
> >>>> test1
> >>>>
> >>>> test2
> >>>>
> >>>> test3
> >>>>
> >>>> test4
> >>>>
> >>>> Running above command for WordsWithCountsTopic returns nothing*.*
> >>>>
> >>>> My program runs out of docker machine, and it does not return any
> >>>> error.
> >>>>
> >>>> I checked kafka logs and kafka-connect logs, no information is shown.
> >>>> Wondering what is the log level in kafka/kafka-connect.
> >>>>
> >>>>
> >>>> Best regards,
> >>>> Mina
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> Hi there,
> >>>>>
> >>>>> I noticed in your example that you are using localhost:9092 to
> produce
> >>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper,
> kafka, and
> >>>>> the Kafka Streams app all running within one docker container, or in
> >>>>> different containers?
> >>>>>
> >>>>> I just tested the WordCountLambdaExample and it works for me. This
> >>>>> might not have anything to do with streams, but rather with the Kafka
> >>>>> configuration and whether streams (that is just an app) can reach
> Kafka at
> >>>>> all. If you provide the above information we can look further.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>> > On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com>
> wrote:
> >>>>> >
> >>>>> > I reset and still not working!
> >>>>> >
> >>>>> > My env is setup using
> >>>>> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/
> quickstart.html
> >>>>> >
> >>>>> > I just tried using
> >>>>> > https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
> >>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
> >>>>> ambdaExample.java#L178-L181
> >>>>> > with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
> >>>>> created
> >>>>> > from scratch as went through the steps as directed.
> >>>>> >
> >>>>> > When I stopped the java program and check the topics below are the
> >>>>> data in
> >>>>> > each topic.
> >>>>> >
> >>>>> > docker run \
> >>>>> >
> >>>>> >  --net=host \
> >>>>> >
> >>>>> >  --rm \
> >>>>> >
> >>>>> >  confluentinc/cp-kafka:3.2.0 \
> >>>>> >
> >>>>> >  kafka-console-consumer --bootstrap-server localhost:29092 --topic
> >>>>> > TextLinesTopic --new-consumer --from-beginning
> >>>>> >
> >>>>> >
> >>>>> > SHOWS
> >>>>> >
> >>>>> > hello kafka streams
> >>>>> >
> >>>>> > all streams lead to kafka
> >>>>> >
> >>>>> > join kafka summit
> >>>>> >
> >>>>> > test1
> >>>>> >
> >>>>> > test2
> >>>>> >
> >>>>> > test3
> >>>>> >
> >>>>> > test4
> >>>>> >
> >>>>> > FOR WordsWithCountsTopic nothing is shown
> >>>>> >
> >>>>> >
> >>>>> > I am new to the Kafka/Kafka Stream and still do not understand why
> a
> >>>>> simple
> >>>>> > example does not work!
> >>>>> >
> >>>>> > On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
> >>>>> matth...@confluent.io>
> >>>>> > wrote:
> >>>>> >
> >>>>> >>>> So, when I check the number of messages in wordCount-input I see
> >>>>> the
> >>>>> >> same
> >>>>> >>>> messages. However, when I run below code I do not see any
> >>>>> message/data
> >>>>> >> in
> >>>>> >>>> wordCount-output.
> >>>>> >>
> >>>>> >> Did you reset your application?
> >>>>> >>
> >>>>> >> Each time you run you app and restart it, it will resume
> processing
> >>>>> >> where it left off. Thus, if something went wrong in you first run
> >>>>> but
> >>>>> >> you got committed offsets, the app will not re-read the whole
> topic.
> >>>>> >>
> >>>>> >> You can check committed offset via bin/kafka-consumer-groups.sh.
> The
> >>>>> >> application-id from StreamConfig is used a group.id.
> >>>>> >>
> >>>>> >> Thus, resetting you app would be required to consumer the input
> >>>>> topic
> >>>>> >> from scratch. Of you just write new data to you input topic.
> >>>>> >>
> >>>>> >>>> Can I connect to kafka in VM/docker container using below code
> or
> >>>>> do I
> >>>>> >> need
> >>>>> >>>> to change/add other parameters? How can I submit the code to
> >>>>> >>>> kafka/kafka-connect? Do we have similar concept as SPARK to
> >>>>> submit the
> >>>>> >>>> code(e.g. jar file)?
> >>>>> >>
> >>>>> >> A Streams app is a regular Java application and can run anywhere
> --
> >>>>> >> there is no notion of a processing cluster and you don't "submit"
> >>>>> your
> >>>>> >> code -- you just run your app.
> >>>>> >>
> >>>>> >> Thus, if your console consumer can connect to the cluster, your
> >>>>> Streams
> >>>>> >> app should also be able to connect to the cluster.
> >>>>> >>
> >>>>> >>
> >>>>> >> Maybe, the short runtime of 5 seconds could be a problem (even if
> it
> >>>>> >> seems log to process just a few records). But you might need to
> put
> >>>>> >> startup delay into account. I would recommend to register a
> shutdown
> >>>>> >> hook: see
> >>>>> >> https://github.com/confluentinc/examples/blob/3.
> >>>>> >> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> >>>>> >> WordCountLambdaExample.java#L178-L181
> >>>>> >>
> >>>>> >>
> >>>>> >> Hope this helps.
> >>>>> >>
> >>>>> >> -Matthias
> >>>>> >>
> >>>>> >>
> >>>>> >> On 3/13/17 7:30 PM, Mina Aslani wrote:
> >>>>> >>> Hi Matthias,
> >>>>> >>>
> >>>>> >>> Thank you for the quick response, appreciate it!
> >>>>> >>>
> >>>>> >>> I created the topics wordCount-input and wordCount-output. Pushed
> >>>>> some
> >>>>> >> data
> >>>>> >>> to wordCount-input using
> >>>>> >>>
> >>>>> >>> docker exec -it $(docker ps -f "name=kafka\\." --format
> >>>>> "{{.Names}}")
> >>>>> >>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
> >>>>> >>> wordCount-input
> >>>>> >>>
> >>>>> >>> test
> >>>>> >>>
> >>>>> >>> new
> >>>>> >>>
> >>>>> >>> word
> >>>>> >>>
> >>>>> >>> count
> >>>>> >>>
> >>>>> >>> wordcount
> >>>>> >>>
> >>>>> >>> word count
> >>>>> >>>
> >>>>> >>> So, when I check the number of messages in wordCount-input I see
> >>>>> the same
> >>>>> >>> messages. However, when I run below code I do not see any
> >>>>> message/data in
> >>>>> >>> wordCount-output.
> >>>>> >>>
> >>>>> >>> Can I connect to kafka in VM/docker container using below code or
> >>>>> do I
> >>>>> >> need
> >>>>> >>> to change/add other parameters? How can I submit the code to
> >>>>> >>> kafka/kafka-connect? Do we have similar concept as SPARK to
> submit
> >>>>> the
> >>>>> >>> code(e.g. jar file)?
> >>>>> >>>
> >>>>> >>> I really appreciate your input as I am blocked and cannot run
> even
> >>>>> below
> >>>>> >>> simple example.
> >>>>> >>>
> >>>>> >>> Best regards,
> >>>>> >>> Mina
> >>>>> >>>
> >>>>> >>> I changed the code to be as below:
> >>>>> >>>
> >>>>> >>> Properties props = new Properties();
> >>>>> >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>>>> "wordCount-streaming");
> >>>>> >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>> "<ipAddress>:9092");
> >>>>> >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>> >>> Serdes.String().getClass().getName());
> >>>>> >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>> >>> Serdes.String().getClass().getName());
> >>>>> >>>
> >>>>> >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> >>>>> >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >>>>> >>>
> >>>>> >>> // setting offset reset to earliest so that we can re-run the
> demo
> >>>>> >>> code with the same pre-loaded data
> >>>>> >>> // Note: To re-run the demo, you need to use the offset reset
> tool:
> >>>>> >>> // https://cwiki.apache.org/confluence/display/KAFKA/
> >>>>> >> Kafka+Streams+Application+Reset+Tool
> >>>>> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >>>>> >>>
> >>>>> >>> KStreamBuilder builder = new KStreamBuilder();
> >>>>> >>>
> >>>>> >>> KStream<String, String> source = builder.stream("wordCount-inpu
> >>>>> t");
> >>>>> >>>
> >>>>> >>> KTable<String, Long> counts = source
> >>>>> >>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
> >>>>> >>>         @Override
> >>>>> >>>         public Iterable<String> apply(String value) {
> >>>>> >>>            return
> >>>>> >>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("
> "));
> >>>>> >>>         }
> >>>>> >>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
> >>>>> >> String>>() {
> >>>>> >>>         @Override
> >>>>> >>>         public KeyValue<String, String> apply(String key, String
> >>>>> value)
> >>>>> >> {
> >>>>> >>>            return new KeyValue<>(value, value);
> >>>>> >>>         }
> >>>>> >>>      })
> >>>>> >>>      .groupByKey()
> >>>>> >>>      .count("Counts");
> >>>>> >>>
> >>>>> >>> // need to override value serde to Long type
> >>>>> >>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> >>>>> >>>
> >>>>> >>> KafkaStreams streams = new KafkaStreams(builder, props);
> >>>>> >>> streams.start();
> >>>>> >>>
> >>>>> >>> // usually the stream application would be running forever,
> >>>>> >>> // in this example we just let it run for some time and stop
> since
> >>>>> the
> >>>>> >>> input data is finite.
> >>>>> >>> Thread.sleep(5000L);
> >>>>> >>>
> >>>>> >>> streams.close();
> >>>>> >>>
> >>>>> >>>
> >>>>> >>>
> >>>>> >>>
> >>>>> >>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
> >>>>> matth...@confluent.io>
> >>>>> >>> wrote:
> >>>>> >>>
> >>>>> >>>> Maybe you need to reset your application using the reset tool:
> >>>>> >>>> http://docs.confluent.io/current/streams/developer-
> >>>>> >>>> guide.html#application-reset-tool
> >>>>> >>>>
> >>>>> >>>> Also keep in mind, that KTables buffer internally, and thus, you
> >>>>> might
> >>>>> >>>> only see data on commit.
> >>>>> >>>>
> >>>>> >>>> Try to reduce commit interval or disable caching by setting
> >>>>> >>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
> >>>>> >>>>
> >>>>> >>>>
> >>>>> >>>> -Matthias
> >>>>> >>>>
> >>>>> >>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
> >>>>> >>>>> Hi,
> >>>>> >>>>>
> >>>>> >>>>> This is the first time that am using Kafka Stream. I would like
> >>>>> to read
> >>>>> >>>>> from input topic and write to output topic. However, I do not
> >>>>> see the
> >>>>> >>>> word
> >>>>> >>>>> count when I try to run below example. Looks like that it does
> >>>>> not
> >>>>> >>>> connect
> >>>>> >>>>> to Kafka. I do not see any error though. I tried my localhost
> >>>>> kafka as
> >>>>> >>>> well
> >>>>> >>>>> as the container in a VM, same situation.
> >>>>> >>>>>
> >>>>> >>>>> There are over 200 message in the input kafka topic.
> >>>>> >>>>>
> >>>>> >>>>> Your input is appreciated!
> >>>>> >>>>>
> >>>>> >>>>> Best regards,
> >>>>> >>>>> Mina
> >>>>> >>>>>
> >>>>> >>>>> import org.apache.kafka.common.serialization.*;
> >>>>> >>>>> import org.apache.kafka.streams.*;
> >>>>> >>>>> import org.apache.kafka.streams.kstream.*;
> >>>>> >>>>>
> >>>>> >>>>> import java.util.*;
> >>>>> >>>>> import java.util.regex.*;
> >>>>> >>>>>
> >>>>> >>>>> public class WordCountExample {
> >>>>> >>>>>
> >>>>> >>>>>
> >>>>> >>>>>   public static void main(String [] args)   {
> >>>>> >>>>>      final Properties streamsConfiguration = new Properties();
> >>>>> >>>>>      streamsConfiguration.put(Strea
> >>>>> msConfig.APPLICATION_ID_CONFIG,
> >>>>> >>>>> "wordcount-streaming");
> >>>>> >>>>>      streamsConfiguration.put(Strea
> >>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>> >>>>> "<IPADDRESS>:9092");
> >>>>> >>>>>      streamsConfiguration.put(Strea
> >>>>> msConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>> >>>>> Serdes.String().getClass().getName());
> >>>>> >>>>>      streamsConfiguration.put(Strea
> >>>>> msConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>> >>>>> Serdes.String().getClass().getName());
> >>>>> >>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> >>>>> >> MS_CONFIG,
> >>>>> >>>>> 10 * 1000);
> >>>>> >>>>>
> >>>>> >>>>>      final Serde<String> stringSerde = Serdes.String();
> >>>>> >>>>>      final Serde<Long> longSerde = Serdes.Long();
> >>>>> >>>>>
> >>>>> >>>>>      final KStreamBuilder builder = new KStreamBuilder();
> >>>>> >>>>>
> >>>>> >>>>>      final KStream<String, String> textLines =
> >>>>> >>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
> >>>>> >>>>>
> >>>>> >>>>>      final Pattern pattern = Pattern.compile("\\W+",
> >>>>> >>>>> Pattern.UNICODE_CHARACTER_CLASS);
> >>>>> >>>>>
> >>>>> >>>>>      final KStream<String, Long> wordCounts = textLines
> >>>>> >>>>>            .flatMapValues(value ->
> >>>>> >>>>> Arrays.asList(pattern.split(value.toLowerCase())))
> >>>>> >>>>>            .groupBy((key, word) -> word)
> >>>>> >>>>>            .count("Counts")
> >>>>> >>>>>            .toStream();
> >>>>> >>>>>
> >>>>> >>>>>
> >>>>> >>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
> >>>>> >>>>>
> >>>>> >>>>>      final KafkaStreams streams = new KafkaStreams(builder,
> >>>>> >>>>> streamsConfiguration);
> >>>>> >>>>>      streams.cleanUp();
> >>>>> >>>>>      streams.start();
> >>>>> >>>>>
> >>>>> >>>>>      Runtime.getRuntime().addShutdownHook(new
> >>>>> >>>> Thread(streams::close));  }
> >>>>> >>>>> }
> >>>>> >>>>>
> >>>>> >>>>
> >>>>> >>>>
> >>>>> >>>
> >>>>> >>
> >>>>> >>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>



-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno <https://twitter.com/miguno>
Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
<http://www.confluent.io/blog>

Reply via email to