Hi,

in your first example with .print() "before" and "after" .to() I want to
clarify, that the order you use to add operators does not really matter
here. The DAG you build will branch out anyway:

                      +--> print()
                      |
KTable -> changelog --+--> to()
                      |
                      +--> print()

For the second example your DAG locks like:

KTable -> changelog --> to() -> TOPIC -> table() -> changelog -> print()

This should eventually also print, but latency might be higher as you
write-read via topic. Also consider, that we use some KTable caches that
might delay print() to trigger:
http://docs.confluent.io/current/streams/developer-guide.html#memory-management


Hope this helps.


-Matthias

On 6/12/17 8:20 AM, john cheng wrote:
> Hi there, I'm testing Kafka Streams's print() method, here is the code:
> 
>         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dsl-wc1");
>         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>         KStream<String, String> source = builder.stream("dsl-input1");
>         KTable<String, Long> countTable = source
>                 .flatMapValues(value -> Arrays.asList(value.split(" ")))
>                 .map((key,value) -> KeyValue.pair(value, value))
>                 .groupByKey().count("Counts1");
>         countTable.print();  // [KSTREAM-AGGREGATE-0000000003]: hello ,
> (1<-null)....
>         countTable.to(Serdes.String(), Serdes.Long(), "ktable-output1");
>         countTable.print();  // print after to, output is the same as print
> before to
> 
> This App output as I expected, no matter print called before to() or after
> to().
> 
>         [KSTREAM-AGGREGATE-0000000003]: hello , (1<-null)
>         [KSTREAM-AGGREGATE-0000000003]: kafka , (1<-null)
>         [KSTREAM-AGGREGATE-0000000003]: hello , (2<-null)
>         [KSTREAM-AGGREGATE-0000000003]: kafka , (2<-null)
>         [KSTREAM-AGGREGATE-0000000003]: streams , (1<-null)
> 
> Then I replace countTable.print() with below code:
> 
>         countTable.to(Serdes.String(), Serdes.Long(), "ktable-output1");
>         KTable<String, Long> table1 = builder.table(Serdes.String(),
> Serdes.Long(), "ktable-output1", "Counts_1");
>         table1.print();
> 
> But things happen strange. When first time startup the app, there're no
> output.
> After stop this app and restarted, The second time running app, there're
> output now.
> 
>     [KTABLE-SOURCE-0000000010]: hello , (1<-null)
>     [KTABLE-SOURCE-0000000010]: kafka , (1<-null)
>     [KTABLE-SOURCE-0000000010]: hello , (2<-null)
>     [KTABLE-SOURCE-0000000010]: kafka , (2<-null)
>     [KTABLE-SOURCE-0000000010]: streams , (1<-null)
> 
> And when I append two message to input-topic, there're outpout too.
> 
>     [KTABLE-SOURCE-0000000010]: kafka , (3<-null)
>     [KTABLE-SOURCE-0000000010]: streams , (2<-null)
> 
> MyQuestion is why no output present at first time? Did I miss something?
> 
> PS: If I have two app, the first app is just:
> countTable.to("ktable-output1")
> and the second app is: builder.table("ktable-output1").print()
> The result is also like what I observed before.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to