Hi Komal,

Actually, the main factor about choosing the type of the partition depends
on your business logic. If you want to do some aggregation logic based on a
group. You must choose KeyBy to guarantee the correctness semantics.

Best,
Vino

Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 下午5:07写道:

> Thank you @vino yang <yanghua1...@gmail.com>  for the reply. I suspect
> keyBy will beneficial in those cases where my subsequent operators are
> computationally intensive. Their computation time being > than network
> reshuffling cost.
>
> Regards,
> Komal
>
> On Mon, 9 Dec 2019 at 15:23, vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi Komal,
>>
>> KeyBy(Hash Partition, logically partition) and rebalance(physical
>> partition) are both one of the partitions been supported by Flink.[1]
>>
>> Generally speaking, partitioning may cause network communication(network
>> shuffles) costs which may cause more time cost. The example provided by you
>> may be benefit from operator chain[2] if you remove the keyBy operation.
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>>
>> Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 上午9:11写道:
>>
>>> Anyone?
>>>
>>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam <komal.mar...@gmail.com>
>>> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> I want to get some insights on the KeyBy (and Rebalance) operations as
>>>> according to my understanding they partition our tasks over the defined
>>>> parallelism and thus should make our pipeline faster.
>>>>
>>>> I am reading a topic which contains 170,000,000 pre-stored records with
>>>> 11 Kafka partitions and replication factor of 1.   Hence I use
>>>> .setStartFromEarliest() to read the stream.
>>>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>>>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>>>> environment parallelism to 30).
>>>>
>>>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>>>> keeping the number of records fixed to get a handle on how fast they're
>>>> being processed.
>>>>
>>>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>>>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>>>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>>>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>>>> likewise has the same latency as keyBy.
>>>>
>>>>  What I want to know is what may be causing this overhead? And is there
>>>> any way to decrease it?
>>>>
>>>> Here's the script I'm running for testing purposes:
>>>> --------------
>>>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>>>> new
>>>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>>>
>>>> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());
>>>>
>>>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>>>
>>>> public class findDistancefromPOI extends RichFilterFunction<Point> {
>>>>     public boolean filter(Point input) throws Exception {
>>>>         Double distance = computeEuclideanDist(
>>>> 16.4199  , 89.974  ,input.X(),input.Y);
>>>>          return distance > 0;
>>>>     }
>>>> }
>>>>
>>>> Best Regards,
>>>> Komal
>>>>
>>>

Reply via email to