Hi group,

We have the following graph below, on which we added metrics for latency 
calculation.
We have two streams which are consumed by two operators:

*         ordersStream and pricesStream - they are both consumed by two 
operators: CoMapperA and CoMapperB, each using connect.


Initially we thought that for stream consumed by two operators - that we need 
to duplicate the stream to two separate streams, so we did it using split as 
below.
Then we understood it is not a must , and two operators can consume same 
stream, so we removed the duplicate part.
However - when checking latency - we found that latency with duplicated streams 
was much better than without duplication (about twice).

My questions:

*         Is the improved latency related to check pointing separately on those 
streams ?

*         What is the cons of using the duplication if it has better latency? 
Are we harming the state correctness in any way?

Additional Info:
The two graphs configuration appear exactly the same in execution plan\web UI:


        [sourceOrders.keyBy,CoMapperA,OrdersStreams]



[cid:image011.png@01D38317.DCA84CA0][cid:image012.png@01D38317.DCA84CA0][cid:image013.png@01D38317.DCA84CA0][cid:image022.png@01D38317.DCA84CA0]prsssssssssssssss

                [cid:image027.png@01D38317.DCA84CA0]

        [sourcePrices.keyBy,CoMapperB,pricesStreams]









Code without duplication looks something like:
KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field);
KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);
orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);


Code used for duplication:
(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, 
and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to 
above).
//duplicate prices streams
        SplitStream<Price> pricesStream = pricesStream
                .split( price -> ImmutableList.of("pricesStreamA "," 
pricesStreamB ") );

        DataStream<Price> pricesStreamA = pricesStreams.select("pricesStreamA");
        DataStream< Price > pricesStreamB= 
pricesStreams.select("pricesStreamB");


Thanks,
Tovi





Attachment: image001.emz
Description: image001.emz

Attachment: image008.emz
Description: image008.emz

Attachment: image009.emz
Description: image009.emz

Attachment: image023.emz
Description: image023.emz

Attachment: image024.emz
Description: image024.emz

Attachment: image025.emz
Description: image025.emz

Attachment: image026.emz
Description: image026.emz

Reply via email to