Hi Timo,

Actually I do keyBy in both cases, and in split\duplicate case I do it on both 
splitted streams.

I did do the connect below twice and not once, but connect only calls ctor of 
ConnectedStreams, and doesn’t do any real operation.
So I don’t see how it will make a difference.
I can try it if you see a reason.


More detailed code including all keyBy:

Code without duplication looks something like:
KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field);
KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);
orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);


Code with duplication (better latency):
(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, 
and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to 
above).
//duplicate prices streams
SplitStream<Price> pricesSplitStream = pricesStream
                .split( price -> ImmutableList.of("pricesStreamA "," 
pricesStreamB ") );
 DataStream<Price> pricesStreamA = pricesSplitStream.select("pricesStreamA");
DataStream< Price > pricesStreamB= pricesSplitStream.select("pricesStreamB");
//duplicate orders streams
 SplitStream< Order > ordersSplitStream = ordersStream
                .split( order -> ImmutableList.of("orderStreamA "," 
orderStreamB ") );
 DataStream<Order> orderStreamA = ordersSplitStream.select("orderStreamA ");
DataStream<Order> orderStreamB = ordersSplitStream.select("orderStreamB ");

DataStream<Order> priceOrdersConnectedStream = 
orderStreamA.connect(pricesStreamA).keyBy(“priceId”,“ priceId”)
.flatMap(mapperA);
DataStream<Price> orderPricesConnectedStream = 
orderStreamB.connect(pricesStreamB).keyBy(“orderId”,“ orderId”)
.flatMap(mapperB);



From: Timo Walther [mailto:twal...@apache.org]
Sent: יום ד 03 ינואר 2018 11:02
To: user@flink.apache.org
Subject: Re: Two operators consuming from same stream

Hi Tovi,

I think your code without duplication performs two separate shuffle operations 
whereas the other code only performs one shuffle.

Further latency impacts might be due to the overhead involved in maintaining 
the partitioning for a keyed stream/key groups and switching key contexts in 
the operator.

Did you check the latency of the following?

DataStream<> ds = 
orderKeyedStream.connect(pricesKeyedStream).flatMap(identityMapper);
ds.flatMap(mapperA);
ds.flatMap(mapperB);

Regards,
Timo


Am 1/1/18 um 2:50 PM schrieb Sofer, Tovi :
Hi group,

We have the following graph below, on which we added metrics for latency 
calculation.
We have two streams which are consumed by two operators:

·         ordersStream and pricesStream – they are both consumed by two 
operators: CoMapperA and CoMapperB, each using connect.


Initially we thought that for stream consumed by two operators – that we need 
to duplicate the stream to two separate streams, so we did it using split as 
below.
Then we understood it is not a must , and two operators can consume same 
stream, so we removed the duplicate part.
However – when checking latency – we found that latency with duplicated streams 
was much better than without duplication (about twice).

My questions:

·         Is the improved latency related to check pointing separately on those 
streams ?

·         What is the cons of using the duplication if it has better latency? 
Are we harming the state correctness in any way?

Additional Info:
The two graphs configuration appear exactly the same in execution plan\web UI:



[sourceOrders.keyBy,CoMapperA,OrdersStreams]

[cid:image002.png@01D3857E.4C6223F0][cid:image003.png@01D3857E.4C6223F0][cid:image004.png@01D3857E.4C6223F0][cid:image005.png@01D3857E.4C6223F0]prsssssssssssssss






[cid:image006.png@01D3857E.4C6223F0]

[sourcePrices.keyBy,CoMapperB,pricesStreams]







Code without duplication looks something like:
KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field);
KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);
orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);


Code used for duplication:
(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, 
and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to 
above).
//duplicate prices streams
        SplitStream<Price> pricesStream = pricesStream
                .split( price -> ImmutableList.of("pricesStreamA "," 
pricesStreamB ") );

        DataStream<Price> pricesStreamA = pricesStreams.select("pricesStreamA");
        DataStream< Price > pricesStreamB= 
pricesStreams.select("pricesStreamB");


Thanks,
Tovi







Reply via email to