Re: KeyBy/Rebalance overhead?

2019-12-09 Thread Arvid Heise
Hi Komal,

as a general rule of thumb, you want to avoid network shuffles as much as
possible. As vino pointed out, you need to reshuffle, if you need to group
by key. Another frequent usecase is for a rebalancing of data in case of a
heavy skew. Since neither applies to you, removing the keyby is the best
option.

If you want to retain it, because you may experience skew in the future,
there are only a couple of things you can do. You may tinker with
networking settings to have smaller/larger network buffers (smaller = less
latency, larger = more throughput) [1]. Of course, you get better results
if you have a faster network (running in the cloud, you can play around
with different adapters). Also you could try if less/more machines are
actually faster (less machines = less network traffic, more machines = more
compute power).

In any case, your data volume is so low that I would probably not optimize
too much. We are talking about seconds and the times may vary largely from
run to run, because of the low data volume. If you want to test the
throughput as a POC for a larger volume, I'd either generate a larger
sample or replicate it to get more reliable numbers. In any case, try to
have your final use case in mind when deciding for an option.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#configuring-the-network-buffers

On Mon, Dec 9, 2019 at 10:25 AM vino yang  wrote:

> Hi Komal,
>
> Actually, the main factor about choosing the type of the partition depends
> on your business logic. If you want to do some aggregation logic based on a
> group. You must choose KeyBy to guarantee the correctness semantics.
>
> Best,
> Vino
>
> Komal Mariam  于2019年12月9日周一 下午5:07写道:
>
>> Thank you @vino yang   for the reply. I suspect
>> keyBy will beneficial in those cases where my subsequent operators are
>> computationally intensive. Their computation time being > than network
>> reshuffling cost.
>>
>> Regards,
>> Komal
>>
>> On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:
>>
>>> Hi Komal,
>>>
>>> KeyBy(Hash Partition, logically partition) and rebalance(physical
>>> partition) are both one of the partitions been supported by Flink.[1]
>>>
>>> Generally speaking, partitioning may cause network communication(network
>>> shuffles) costs which may cause more time cost. The example provided by you
>>> may be benefit from operator chain[2] if you remove the keyBy operation.
>>>
>>> Best,
>>> Vino
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>>>
>>> Komal Mariam  于2019年12月9日周一 上午9:11写道:
>>>
 Anyone?

 On Fri, 6 Dec 2019 at 19:07, Komal Mariam 
 wrote:

> Hello everyone,
>
> I want to get some insights on the KeyBy (and Rebalance) operations as
> according to my understanding they partition our tasks over the defined
> parallelism and thus should make our pipeline faster.
>
> I am reading a topic which contains 170,000,000 pre-stored records
> with 11 Kafka partitions and replication factor of 1.   Hence I use
> .setStartFromEarliest() to read the stream.
> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
> and 1 job manager with 6 cores. (10 task slots per TM hence I set
> environment parallelism to 30).
>
> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
> keeping the number of records fixed to get a handle on how fast they're
> being processed.
>
> When I remove keyBy, I get the same results in 39 secs as opposed to
> 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
> below I still get the same extra overhead of 9 to 13secs. My data is 
> mostly
> uniformly distributed on it's key so I can rule out skew.  Rebalance
> likewise has the same latency as keyBy.
>
>  What I want to know is what may be causing this overhead? And is
> there any way to decrease it?
>
> Here's the script I'm running for testing purposes:
> --
> DataStream JSONStream  = env.addSource(new
> FlinkKafkaConsumer<>("data", new
> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>
> DataStream myPoints = JSONStream.map(new jsonToPoint());
>
> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>
> public class findDistancefromPOI extends RichFilterFunction {
> public boolean filter(Point input) throws Exception {
> Double distance = computeEuclideanDist(
> 16.4199  , 89.974  ,input.X(),input.Y);
>  return distance > 0;
> }
> }
>
> Best Regards,
> Komal
>



Re: KeyBy/Rebalance overhead?

2019-12-09 Thread vino yang
Hi Komal,

Actually, the main factor about choosing the type of the partition depends
on your business logic. If you want to do some aggregation logic based on a
group. You must choose KeyBy to guarantee the correctness semantics.

Best,
Vino

Komal Mariam  于2019年12月9日周一 下午5:07写道:

> Thank you @vino yang   for the reply. I suspect
> keyBy will beneficial in those cases where my subsequent operators are
> computationally intensive. Their computation time being > than network
> reshuffling cost.
>
> Regards,
> Komal
>
> On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:
>
>> Hi Komal,
>>
>> KeyBy(Hash Partition, logically partition) and rebalance(physical
>> partition) are both one of the partitions been supported by Flink.[1]
>>
>> Generally speaking, partitioning may cause network communication(network
>> shuffles) costs which may cause more time cost. The example provided by you
>> may be benefit from operator chain[2] if you remove the keyBy operation.
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>>
>> Komal Mariam  于2019年12月9日周一 上午9:11写道:
>>
>>> Anyone?
>>>
>>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam 
>>> wrote:
>>>
 Hello everyone,

 I want to get some insights on the KeyBy (and Rebalance) operations as
 according to my understanding they partition our tasks over the defined
 parallelism and thus should make our pipeline faster.

 I am reading a topic which contains 170,000,000 pre-stored records with
 11 Kafka partitions and replication factor of 1.   Hence I use
 .setStartFromEarliest() to read the stream.
 My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
 and 1 job manager with 6 cores. (10 task slots per TM hence I set
 environment parallelism to 30).

 There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
 keeping the number of records fixed to get a handle on how fast they're
 being processed.

 When I remove keyBy, I get the same results in 39 secs as opposed to 52
 secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
 below I still get the same extra overhead of 9 to 13secs. My data is mostly
 uniformly distributed on it's key so I can rule out skew.  Rebalance
 likewise has the same latency as keyBy.

  What I want to know is what may be causing this overhead? And is there
 any way to decrease it?

 Here's the script I'm running for testing purposes:
 --
 DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
 new
 JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

 DataStream myPoints = JSONStream.map(new jsonToPoint());

 mypoints.keyBy("oID").filter(new findDistancefromPOI());

 public class findDistancefromPOI extends RichFilterFunction {
 public boolean filter(Point input) throws Exception {
 Double distance = computeEuclideanDist(
 16.4199  , 89.974  ,input.X(),input.Y);
  return distance > 0;
 }
 }

 Best Regards,
 Komal

>>>


Re: KeyBy/Rebalance overhead?

2019-12-09 Thread Komal Mariam
Thank you @vino yang   for the reply. I suspect
keyBy will beneficial in those cases where my subsequent operators are
computationally intensive. Their computation time being > than network
reshuffling cost.

Regards,
Komal

On Mon, 9 Dec 2019 at 15:23, vino yang  wrote:

> Hi Komal,
>
> KeyBy(Hash Partition, logically partition) and rebalance(physical
> partition) are both one of the partitions been supported by Flink.[1]
>
> Generally speaking, partitioning may cause network communication(network
> shuffles) costs which may cause more time cost. The example provided by you
> may be benefit from operator chain[2] if you remove the keyBy operation.
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>
> Komal Mariam  于2019年12月9日周一 上午9:11写道:
>
>> Anyone?
>>
>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam  wrote:
>>
>>> Hello everyone,
>>>
>>> I want to get some insights on the KeyBy (and Rebalance) operations as
>>> according to my understanding they partition our tasks over the defined
>>> parallelism and thus should make our pipeline faster.
>>>
>>> I am reading a topic which contains 170,000,000 pre-stored records with
>>> 11 Kafka partitions and replication factor of 1.   Hence I use
>>> .setStartFromEarliest() to read the stream.
>>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>>> environment parallelism to 30).
>>>
>>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>>> keeping the number of records fixed to get a handle on how fast they're
>>> being processed.
>>>
>>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>>> likewise has the same latency as keyBy.
>>>
>>>  What I want to know is what may be causing this overhead? And is there
>>> any way to decrease it?
>>>
>>> Here's the script I'm running for testing purposes:
>>> --
>>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>>> new
>>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>>
>>> DataStream myPoints = JSONStream.map(new jsonToPoint());
>>>
>>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>>
>>> public class findDistancefromPOI extends RichFilterFunction {
>>> public boolean filter(Point input) throws Exception {
>>> Double distance = computeEuclideanDist(
>>> 16.4199  , 89.974  ,input.X(),input.Y);
>>>  return distance > 0;
>>> }
>>> }
>>>
>>> Best Regards,
>>> Komal
>>>
>>


Re: KeyBy/Rebalance overhead?

2019-12-08 Thread vino yang
Hi Komal,

KeyBy(Hash Partition, logically partition) and rebalance(physical
partition) are both one of the partitions been supported by Flink.[1]

Generally speaking, partitioning may cause network communication(network
shuffles) costs which may cause more time cost. The example provided by you
may be benefit from operator chain[2] if you remove the keyBy operation.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains

Komal Mariam  于2019年12月9日周一 上午9:11写道:

> Anyone?
>
> On Fri, 6 Dec 2019 at 19:07, Komal Mariam  wrote:
>
>> Hello everyone,
>>
>> I want to get some insights on the KeyBy (and Rebalance) operations as
>> according to my understanding they partition our tasks over the defined
>> parallelism and thus should make our pipeline faster.
>>
>> I am reading a topic which contains 170,000,000 pre-stored records with
>> 11 Kafka partitions and replication factor of 1.   Hence I use
>> .setStartFromEarliest() to read the stream.
>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>> environment parallelism to 30).
>>
>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>> keeping the number of records fixed to get a handle on how fast they're
>> being processed.
>>
>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>> likewise has the same latency as keyBy.
>>
>>  What I want to know is what may be causing this overhead? And is there
>> any way to decrease it?
>>
>> Here's the script I'm running for testing purposes:
>> --
>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>> new
>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>
>> DataStream myPoints = JSONStream.map(new jsonToPoint());
>>
>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>
>> public class findDistancefromPOI extends RichFilterFunction {
>> public boolean filter(Point input) throws Exception {
>> Double distance = computeEuclideanDist(
>> 16.4199  , 89.974  ,input.X(),input.Y);
>>  return distance > 0;
>> }
>> }
>>
>> Best Regards,
>> Komal
>>
>


Re: KeyBy/Rebalance overhead?

2019-12-08 Thread Komal Mariam
Anyone?

On Fri, 6 Dec 2019 at 19:07, Komal Mariam  wrote:

> Hello everyone,
>
> I want to get some insights on the KeyBy (and Rebalance) operations as
> according to my understanding they partition our tasks over the defined
> parallelism and thus should make our pipeline faster.
>
> I am reading a topic which contains 170,000,000 pre-stored records with 11
> Kafka partitions and replication factor of 1.   Hence I use
> .setStartFromEarliest() to read the stream.
> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and
> 1 job manager with 6 cores. (10 task slots per TM hence I set environment
> parallelism to 30).
>
> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
> keeping the number of records fixed to get a handle on how fast they're
> being processed.
>
> When I remove keyBy, I get the same results in 39 secs as opposed to 52
> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
> below I still get the same extra overhead of 9 to 13secs. My data is mostly
> uniformly distributed on it's key so I can rule out skew.  Rebalance
> likewise has the same latency as keyBy.
>
>  What I want to know is what may be causing this overhead? And is there
> any way to decrease it?
>
> Here's the script I'm running for testing purposes:
> --
> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
> new
> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>
> DataStream myPoints = JSONStream.map(new jsonToPoint());
>
> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>
> public class findDistancefromPOI extends RichFilterFunction {
> public boolean filter(Point input) throws Exception {
> Double distance = computeEuclideanDist(
> 16.4199  , 89.974  ,input.X(),input.Y);
>  return distance > 0;
> }
> }
>
> Best Regards,
> Komal
>


KeyBy/Rebalance overhead?

2019-12-06 Thread Komal Mariam
Hello everyone,

I want to get some insights on the KeyBy (and Rebalance) operations as
according to my understanding they partition our tasks over the defined
parallelism and thus should make our pipeline faster.

I am reading a topic which contains 170,000,000 pre-stored records with 11
Kafka partitions and replication factor of 1.   Hence I use
.setStartFromEarliest() to read the stream.
My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and
1 job manager with 6 cores. (10 task slots per TM hence I set environment
parallelism to 30).

There are about 10,000 object IDs hence 10,000 keys.  Right now I'm keeping
the number of records fixed to get a handle on how fast they're being
processed.

When I remove keyBy, I get the same results in 39 secs as opposed to 52
secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
below I still get the same extra overhead of 9 to 13secs. My data is mostly
uniformly distributed on it's key so I can rule out skew.  Rebalance
likewise has the same latency as keyBy.

 What I want to know is what may be causing this overhead? And is there any
way to decrease it?

Here's the script I'm running for testing purposes:
--
DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data", new
JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

DataStream myPoints = JSONStream.map(new jsonToPoint());

mypoints.keyBy("oID").filter(new findDistancefromPOI());

public class findDistancefromPOI extends RichFilterFunction {
public boolean filter(Point input) throws Exception {
Double distance = computeEuclideanDist(
16.4199  , 89.974  ,input.X(),input.Y);
 return distance > 0;
}
}

Best Regards,
Komal