Hi, in your first example with .print() "before" and "after" .to() I want to clarify, that the order you use to add operators does not really matter here. The DAG you build will branch out anyway:
+--> print() | KTable -> changelog --+--> to() | +--> print() For the second example your DAG locks like: KTable -> changelog --> to() -> TOPIC -> table() -> changelog -> print() This should eventually also print, but latency might be higher as you write-read via topic. Also consider, that we use some KTable caches that might delay print() to trigger: http://docs.confluent.io/current/streams/developer-guide.html#memory-management Hope this helps. -Matthias On 6/12/17 8:20 AM, john cheng wrote: > Hi there, I'm testing Kafka Streams's print() method, here is the code: > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dsl-wc1"); > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > KStream<String, String> source = builder.stream("dsl-input1"); > KTable<String, Long> countTable = source > .flatMapValues(value -> Arrays.asList(value.split(" "))) > .map((key,value) -> KeyValue.pair(value, value)) > .groupByKey().count("Counts1"); > countTable.print(); // [KSTREAM-AGGREGATE-0000000003]: hello , > (1<-null).... > countTable.to(Serdes.String(), Serdes.Long(), "ktable-output1"); > countTable.print(); // print after to, output is the same as print > before to > > This App output as I expected, no matter print called before to() or after > to(). > > [KSTREAM-AGGREGATE-0000000003]: hello , (1<-null) > [KSTREAM-AGGREGATE-0000000003]: kafka , (1<-null) > [KSTREAM-AGGREGATE-0000000003]: hello , (2<-null) > [KSTREAM-AGGREGATE-0000000003]: kafka , (2<-null) > [KSTREAM-AGGREGATE-0000000003]: streams , (1<-null) > > Then I replace countTable.print() with below code: > > countTable.to(Serdes.String(), Serdes.Long(), "ktable-output1"); > KTable<String, Long> table1 = builder.table(Serdes.String(), > Serdes.Long(), "ktable-output1", "Counts_1"); > table1.print(); > > But things happen strange. When first time startup the app, there're no > output. > After stop this app and restarted, The second time running app, there're > output now. > > [KTABLE-SOURCE-0000000010]: hello , (1<-null) > [KTABLE-SOURCE-0000000010]: kafka , (1<-null) > [KTABLE-SOURCE-0000000010]: hello , (2<-null) > [KTABLE-SOURCE-0000000010]: kafka , (2<-null) > [KTABLE-SOURCE-0000000010]: streams , (1<-null) > > And when I append two message to input-topic, there're outpout too. > > [KTABLE-SOURCE-0000000010]: kafka , (3<-null) > [KTABLE-SOURCE-0000000010]: streams , (2<-null) > > MyQuestion is why no output present at first time? Did I miss something? > > PS: If I have two app, the first app is just: > countTable.to("ktable-output1") > and the second app is: builder.table("ktable-output1").print() > The result is also like what I observed before. >
signature.asc
Description: OpenPGP digital signature