Hi Mina,

It might be that you need to set this property on the Kafka broker config file 
(server.properties):
advertised.listeners=PLAINTEXT://your.host.name:9092 
<plaintext://your.host.name:9092>


The problem might be this: within docker you run Kafka and Kafka’s address is 
localhost:9092. Great. Then say you have another container or are running the 
streams app on your local laptop. If you point streams to localhost:9092 that 
is “not” where Kafka is running. So you need to point your streams app at the 
address of the container. That’s the second step. The first step is to have 
Kafka advertise that address to the streams app and that you do by setting the 
address above. Example:

advertised.listeners=PLAINTEXT://123.45.67:9092 <plaintext://123.45.67:9092>

Then when you run the streams app you pass in 123.45.67:9092 
<plaintext://123.45.67:9092>.

Thanks
Eno

> On Mar 15, 2017, at 5:14 AM, Mina Aslani <aslanim...@gmail.com> wrote:
> 
> Hi,
> I just checked streams-wordcount-output topic using below command
> 
> docker run \
> 
>  --net=host \
> 
>  --rm \
> 
>  confluentinc/cp-kafka:3.2.0 \
> 
>  kafka-console-consumer --bootstrap-server localhost:9092 \
> 
>          --topic streams-wordcount-output \
> 
>          --from-beginning \
> 
>          --formatter kafka.tools.DefaultMessageFormatter \
> 
>          --property print.key=true \
> 
>          --property key.deserializer=org.apache.ka
> fka.common.serialization.StringDeserializer \
> 
>          --property value.deserializer=org.apache.
> kafka.common.serialization.LongDeserializer
> 
> 
> and it returns
> 
> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1
> 
> Please note above result is when I tried  http://docs.confluent.i
> o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> docker-machine  ran /usr/bin/kafka-run-class org.apache.kafka.streams.examp
> les.wordcount.WordCountDemo.
> 
> How come running same program out of docker-machine does not output to the
> output topic?
> Should I make the program as jar and deploy to docker-machine and run it
> using ./bin/kafka-run-class?
> 
> Best regards,
> Mina
> 
> 
> 
> On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani <aslanim...@gmail.com> wrote:
> 
>> I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#goal-
>> of-this-quickstart
>> 
>> and in docker-machine  ran /usr/bin/kafka-run-class
>> org.apache.kafka.streams.examples.wordcount.WordCountDemo
>> 
>> Running
>> 
>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
>> kafka-console-consumer --bootstrap-server localhost:9092 --topic
>> streams-wordcount-output --new-consumer --from-beginning
>> 
>> shows 8 blank messages
>> 
>> Is there any setting/configuration should be done as running the class in
>> the docker-machine and running program outside the docker-machine does not
>> return expected result!
>> 
>> On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani <aslanim...@gmail.com> wrote:
>> 
>>> And the port for kafka is 29092 and for zookeeper 32181.
>>> 
>>> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani <aslanim...@gmail.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I forgot to add in my previous email 2 questions.
>>>> 
>>>> To setup my env, shall I use https://raw.githubusercont
>>>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
>>>> single-node/docker-compose.yml instead or is there any other
>>>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
>>>> 
>>>> How can I check "whether streams (that is just an app) can reach Kafka"?
>>>> 
>>>> Regards,
>>>> Mina
>>>> 
>>>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani <aslanim...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Eno,
>>>>> 
>>>>> Sorry! That is a typo!
>>>>> 
>>>>> I have a docker-machine with different containers (setup as directed @
>>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
>>>>> 
>>>>> docker ps --format "{{.Image}}: {{.Names}}"
>>>>> 
>>>>> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
>>>>> 
>>>>> confluentinc/cp-enterprise-control-center:3.2.0: control-center
>>>>> 
>>>>> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
>>>>> 
>>>>> confluentinc/cp-schema-registry:3.2.0: schema-registry
>>>>> 
>>>>> confluentinc/cp-kafka:3.2.0: kafka
>>>>> 
>>>>> confluentinc/cp-zookeeper:3.2.0: zookeeper
>>>>> 
>>>>> I used example @ https://github.com/confluent
>>>>> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
>>>>> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
>>>>> followed the same steps.
>>>>> 
>>>>> When I run below command in docker-machine, I see the messages in
>>>>> TextLinesTopic.
>>>>> 
>>>>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 
>>>>> kafka-console-consumer
>>>>> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
>>>>> --from-beginning
>>>>> 
>>>>> hello kafka streams
>>>>> 
>>>>> all streams lead to kafka
>>>>> 
>>>>> join kafka summit
>>>>> 
>>>>> test1
>>>>> 
>>>>> test2
>>>>> 
>>>>> test3
>>>>> 
>>>>> test4
>>>>> 
>>>>> Running above command for WordsWithCountsTopic returns nothing*.*
>>>>> 
>>>>> My program runs out of docker machine, and it does not return any
>>>>> error.
>>>>> 
>>>>> I checked kafka logs and kafka-connect logs, no information is shown.
>>>>> Wondering what is the log level in kafka/kafka-connect.
>>>>> 
>>>>> 
>>>>> Best regards,
>>>>> Mina
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Mar 14, 2017 at 3:57 PM, Eno Thereska <eno.there...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi there,
>>>>>> 
>>>>>> I noticed in your example that you are using localhost:9092 to produce
>>>>>> but localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, 
>>>>>> and
>>>>>> the Kafka Streams app all running within one docker container, or in
>>>>>> different containers?
>>>>>> 
>>>>>> I just tested the WordCountLambdaExample and it works for me. This
>>>>>> might not have anything to do with streams, but rather with the Kafka
>>>>>> configuration and whether streams (that is just an app) can reach Kafka 
>>>>>> at
>>>>>> all. If you provide the above information we can look further.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Thanks
>>>>>> Eno
>>>>>> 
>>>>>>> On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote:
>>>>>>> 
>>>>>>> I reset and still not working!
>>>>>>> 
>>>>>>> My env is setup using
>>>>>>> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
>>>>>>> 
>>>>>>> I just tried using
>>>>>>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-st
>>>>>> reams/src/main/java/io/confluent/examples/streams/WordCountL
>>>>>> ambdaExample.java#L178-L181
>>>>>>> with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic)
>>>>>> created
>>>>>>> from scratch as went through the steps as directed.
>>>>>>> 
>>>>>>> When I stopped the java program and check the topics below are the
>>>>>> data in
>>>>>>> each topic.
>>>>>>> 
>>>>>>> docker run \
>>>>>>> 
>>>>>>> --net=host \
>>>>>>> 
>>>>>>> --rm \
>>>>>>> 
>>>>>>> confluentinc/cp-kafka:3.2.0 \
>>>>>>> 
>>>>>>> kafka-console-consumer --bootstrap-server localhost:29092 --topic
>>>>>>> TextLinesTopic --new-consumer --from-beginning
>>>>>>> 
>>>>>>> 
>>>>>>> SHOWS
>>>>>>> 
>>>>>>> hello kafka streams
>>>>>>> 
>>>>>>> all streams lead to kafka
>>>>>>> 
>>>>>>> join kafka summit
>>>>>>> 
>>>>>>> test1
>>>>>>> 
>>>>>>> test2
>>>>>>> 
>>>>>>> test3
>>>>>>> 
>>>>>>> test4
>>>>>>> 
>>>>>>> FOR WordsWithCountsTopic nothing is shown
>>>>>>> 
>>>>>>> 
>>>>>>> I am new to the Kafka/Kafka Stream and still do not understand why a
>>>>>> simple
>>>>>>> example does not work!
>>>>>>> 
>>>>>>> On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <
>>>>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>>>> So, when I check the number of messages in wordCount-input I see
>>>>>> the
>>>>>>>> same
>>>>>>>>>> messages. However, when I run below code I do not see any
>>>>>> message/data
>>>>>>>> in
>>>>>>>>>> wordCount-output.
>>>>>>>> 
>>>>>>>> Did you reset your application?
>>>>>>>> 
>>>>>>>> Each time you run you app and restart it, it will resume processing
>>>>>>>> where it left off. Thus, if something went wrong in you first run
>>>>>> but
>>>>>>>> you got committed offsets, the app will not re-read the whole topic.
>>>>>>>> 
>>>>>>>> You can check committed offset via bin/kafka-consumer-groups.sh. The
>>>>>>>> application-id from StreamConfig is used a group.id.
>>>>>>>> 
>>>>>>>> Thus, resetting you app would be required to consumer the input
>>>>>> topic
>>>>>>>> from scratch. Of you just write new data to you input topic.
>>>>>>>> 
>>>>>>>>>> Can I connect to kafka in VM/docker container using below code or
>>>>>> do I
>>>>>>>> need
>>>>>>>>>> to change/add other parameters? How can I submit the code to
>>>>>>>>>> kafka/kafka-connect? Do we have similar concept as SPARK to
>>>>>> submit the
>>>>>>>>>> code(e.g. jar file)?
>>>>>>>> 
>>>>>>>> A Streams app is a regular Java application and can run anywhere --
>>>>>>>> there is no notion of a processing cluster and you don't "submit"
>>>>>> your
>>>>>>>> code -- you just run your app.
>>>>>>>> 
>>>>>>>> Thus, if your console consumer can connect to the cluster, your
>>>>>> Streams
>>>>>>>> app should also be able to connect to the cluster.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Maybe, the short runtime of 5 seconds could be a problem (even if it
>>>>>>>> seems log to process just a few records). But you might need to put
>>>>>>>> startup delay into account. I would recommend to register a shutdown
>>>>>>>> hook: see
>>>>>>>> https://github.com/confluentinc/examples/blob/3.
>>>>>>>> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>>>>>>>> WordCountLambdaExample.java#L178-L181
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Hope this helps.
>>>>>>>> 
>>>>>>>> -Matthias
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 3/13/17 7:30 PM, Mina Aslani wrote:
>>>>>>>>> Hi Matthias,
>>>>>>>>> 
>>>>>>>>> Thank you for the quick response, appreciate it!
>>>>>>>>> 
>>>>>>>>> I created the topics wordCount-input and wordCount-output. Pushed
>>>>>> some
>>>>>>>> data
>>>>>>>>> to wordCount-input using
>>>>>>>>> 
>>>>>>>>> docker exec -it $(docker ps -f "name=kafka\\." --format
>>>>>> "{{.Names}}")
>>>>>>>>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>>>>>>>>> wordCount-input
>>>>>>>>> 
>>>>>>>>> test
>>>>>>>>> 
>>>>>>>>> new
>>>>>>>>> 
>>>>>>>>> word
>>>>>>>>> 
>>>>>>>>> count
>>>>>>>>> 
>>>>>>>>> wordcount
>>>>>>>>> 
>>>>>>>>> word count
>>>>>>>>> 
>>>>>>>>> So, when I check the number of messages in wordCount-input I see
>>>>>> the same
>>>>>>>>> messages. However, when I run below code I do not see any
>>>>>> message/data in
>>>>>>>>> wordCount-output.
>>>>>>>>> 
>>>>>>>>> Can I connect to kafka in VM/docker container using below code or
>>>>>> do I
>>>>>>>> need
>>>>>>>>> to change/add other parameters? How can I submit the code to
>>>>>>>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit
>>>>>> the
>>>>>>>>> code(e.g. jar file)?
>>>>>>>>> 
>>>>>>>>> I really appreciate your input as I am blocked and cannot run even
>>>>>> below
>>>>>>>>> simple example.
>>>>>>>>> 
>>>>>>>>> Best regards,
>>>>>>>>> Mina
>>>>>>>>> 
>>>>>>>>> I changed the code to be as below:
>>>>>>>>> 
>>>>>>>>> Properties props = new Properties();
>>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>>> "wordCount-streaming");
>>>>>>>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>> "<ipAddress>:9092");
>>>>>>>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>>>>>> Serdes.String().getClass().getName());
>>>>>>>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>>>>>> Serdes.String().getClass().getName());
>>>>>>>>> 
>>>>>>>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>>>>>>>>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>>>>>>>> 
>>>>>>>>> // setting offset reset to earliest so that we can re-run the demo
>>>>>>>>> code with the same pre-loaded data
>>>>>>>>> // Note: To re-run the demo, you need to use the offset reset tool:
>>>>>>>>> // https://cwiki.apache.org/confluence/display/KAFKA/
>>>>>>>> Kafka+Streams+Application+Reset+Tool
>>>>>>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>>>>>>> 
>>>>>>>>> KStreamBuilder builder = new KStreamBuilder();
>>>>>>>>> 
>>>>>>>>> KStream<String, String> source = builder.stream("wordCount-inpu
>>>>>> t");
>>>>>>>>> 
>>>>>>>>> KTable<String, Long> counts = source
>>>>>>>>>     .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>>>>>>>>>        @Override
>>>>>>>>>        public Iterable<String> apply(String value) {
>>>>>>>>>           return
>>>>>>>>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>>>>>>>>>        }
>>>>>>>>>     }).map(new KeyValueMapper<String, String, KeyValue<String,
>>>>>>>> String>>() {
>>>>>>>>>        @Override
>>>>>>>>>        public KeyValue<String, String> apply(String key, String
>>>>>> value)
>>>>>>>> {
>>>>>>>>>           return new KeyValue<>(value, value);
>>>>>>>>>        }
>>>>>>>>>     })
>>>>>>>>>     .groupByKey()
>>>>>>>>>     .count("Counts");
>>>>>>>>> 
>>>>>>>>> // need to override value serde to Long type
>>>>>>>>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>>>>>>>>> 
>>>>>>>>> KafkaStreams streams = new KafkaStreams(builder, props);
>>>>>>>>> streams.start();
>>>>>>>>> 
>>>>>>>>> // usually the stream application would be running forever,
>>>>>>>>> // in this example we just let it run for some time and stop since
>>>>>> the
>>>>>>>>> input data is finite.
>>>>>>>>> Thread.sleep(5000L);
>>>>>>>>> 
>>>>>>>>> streams.close();
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <
>>>>>> matth...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Maybe you need to reset your application using the reset tool:
>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>>>>>> guide.html#application-reset-tool
>>>>>>>>>> 
>>>>>>>>>> Also keep in mind, that KTables buffer internally, and thus, you
>>>>>> might
>>>>>>>>>> only see data on commit.
>>>>>>>>>> 
>>>>>>>>>> Try to reduce commit interval or disable caching by setting
>>>>>>>>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -Matthias
>>>>>>>>>> 
>>>>>>>>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>> 
>>>>>>>>>>> This is the first time that am using Kafka Stream. I would like
>>>>>> to read
>>>>>>>>>>> from input topic and write to output topic. However, I do not
>>>>>> see the
>>>>>>>>>> word
>>>>>>>>>>> count when I try to run below example. Looks like that it does
>>>>>> not
>>>>>>>>>> connect
>>>>>>>>>>> to Kafka. I do not see any error though. I tried my localhost
>>>>>> kafka as
>>>>>>>>>> well
>>>>>>>>>>> as the container in a VM, same situation.
>>>>>>>>>>> 
>>>>>>>>>>> There are over 200 message in the input kafka topic.
>>>>>>>>>>> 
>>>>>>>>>>> Your input is appreciated!
>>>>>>>>>>> 
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Mina
>>>>>>>>>>> 
>>>>>>>>>>> import org.apache.kafka.common.serialization.*;
>>>>>>>>>>> import org.apache.kafka.streams.*;
>>>>>>>>>>> import org.apache.kafka.streams.kstream.*;
>>>>>>>>>>> 
>>>>>>>>>>> import java.util.*;
>>>>>>>>>>> import java.util.regex.*;
>>>>>>>>>>> 
>>>>>>>>>>> public class WordCountExample {
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>  public static void main(String [] args)   {
>>>>>>>>>>>     final Properties streamsConfiguration = new Properties();
>>>>>>>>>>>     streamsConfiguration.put(Strea
>>>>>> msConfig.APPLICATION_ID_CONFIG,
>>>>>>>>>>> "wordcount-streaming");
>>>>>>>>>>>     streamsConfiguration.put(Strea
>>>>>> msConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>>>>>> "<IPADDRESS>:9092");
>>>>>>>>>>>     streamsConfiguration.put(Strea
>>>>>> msConfig.KEY_SERDE_CLASS_CONFIG,
>>>>>>>>>>> Serdes.String().getClass().getName());
>>>>>>>>>>>     streamsConfiguration.put(Strea
>>>>>> msConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>>>>>>>> Serdes.String().getClass().getName());
>>>>>>>>>>>     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>>>>>>>> MS_CONFIG,
>>>>>>>>>>> 10 * 1000);
>>>>>>>>>>> 
>>>>>>>>>>>     final Serde<String> stringSerde = Serdes.String();
>>>>>>>>>>>     final Serde<Long> longSerde = Serdes.Long();
>>>>>>>>>>> 
>>>>>>>>>>>     final KStreamBuilder builder = new KStreamBuilder();
>>>>>>>>>>> 
>>>>>>>>>>>     final KStream<String, String> textLines =
>>>>>>>>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>>>>>>>>>> 
>>>>>>>>>>>     final Pattern pattern = Pattern.compile("\\W+",
>>>>>>>>>>> Pattern.UNICODE_CHARACTER_CLASS);
>>>>>>>>>>> 
>>>>>>>>>>>     final KStream<String, Long> wordCounts = textLines
>>>>>>>>>>>           .flatMapValues(value ->
>>>>>>>>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>>>>>>>>>>           .groupBy((key, word) -> word)
>>>>>>>>>>>           .count("Counts")
>>>>>>>>>>>           .toStream();
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>     wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>>>>>>>>>> 
>>>>>>>>>>>     final KafkaStreams streams = new KafkaStreams(builder,
>>>>>>>>>>> streamsConfiguration);
>>>>>>>>>>>     streams.cleanUp();
>>>>>>>>>>>     streams.start();
>>>>>>>>>>> 
>>>>>>>>>>>     Runtime.getRuntime().addShutdownHook(new
>>>>>>>>>> Thread(streams::close));  }
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to