Hi group, We have the following graph below, on which we added metrics for latency calculation. We have two streams which are consumed by two operators:
* ordersStream and pricesStream - they are both consumed by two operators: CoMapperA and CoMapperB, each using connect. Initially we thought that for stream consumed by two operators - that we need to duplicate the stream to two separate streams, so we did it using split as below. Then we understood it is not a must , and two operators can consume same stream, so we removed the duplicate part. However - when checking latency - we found that latency with duplicated streams was much better than without duplication (about twice). My questions: * Is the improved latency related to check pointing separately on those streams ? * What is the cons of using the duplication if it has better latency? Are we harming the state correctness in any way? Additional Info: The two graphs configuration appear exactly the same in execution plan\web UI: [sourceOrders.keyBy,CoMapperA,OrdersStreams] [cid:image011.png@01D38317.DCA84CA0][cid:image012.png@01D38317.DCA84CA0][cid:image013.png@01D38317.DCA84CA0][cid:image022.png@01D38317.DCA84CA0]prsssssssssssssss [cid:image027.png@01D38317.DCA84CA0] [sourcePrices.keyBy,CoMapperB,pricesStreams] Code without duplication looks something like: KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field); KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field); orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA); orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB); Code used for duplication: (We duplicate streams and then do connect of pricesStreamA with ordersStreamA, and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to above). //duplicate prices streams SplitStream<Price> pricesStream = pricesStream .split( price -> ImmutableList.of("pricesStreamA "," pricesStreamB ") ); DataStream<Price> pricesStreamA = pricesStreams.select("pricesStreamA"); DataStream< Price > pricesStreamB= pricesStreams.select("pricesStreamB"); Thanks, Tovi
image001.emz
Description: image001.emz
image008.emz
Description: image008.emz
image009.emz
Description: image009.emz
image023.emz
Description: image023.emz
image024.emz
Description: image024.emz
image025.emz
Description: image025.emz
image026.emz
Description: image026.emz