Thank you so much for the detailed reply. I understand the usage for keyBy
a lot better now.  You are correct about the time variation too. We will
apply different network settings and extend our datasets to check
performance on different use cases.

On Mon, 9 Dec 2019 at 20:45, Arvid Heise <ar...@ververica.com> wrote:

> Hi Komal,
>
> as a general rule of thumb, you want to avoid network shuffles as much as
> possible. As vino pointed out, you need to reshuffle, if you need to group
> by key. Another frequent usecase is for a rebalancing of data in case of a
> heavy skew. Since neither applies to you, removing the keyby is the best
> option.
>
> If you want to retain it, because you may experience skew in the future,
> there are only a couple of things you can do. You may tinker with
> networking settings to have smaller/larger network buffers (smaller = less
> latency, larger = more throughput) [1]. Of course, you get better results
> if you have a faster network (running in the cloud, you can play around
> with different adapters). Also you could try if less/more machines are
> actually faster (less machines = less network traffic, more machines = more
> compute power).
>
> In any case, your data volume is so low that I would probably not optimize
> too much. We are talking about seconds and the times may vary largely from
> run to run, because of the low data volume. If you want to test the
> throughput as a POC for a larger volume, I'd either generate a larger
> sample or replicate it to get more reliable numbers. In any case, try to
> have your final use case in mind when deciding for an option.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#configuring-the-network-buffers
>
> On Mon, Dec 9, 2019 at 10:25 AM vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi Komal,
>>
>> Actually, the main factor about choosing the type of the partition
>> depends on your business logic. If you want to do some aggregation logic
>> based on a group. You must choose KeyBy to guarantee the correctness
>> semantics.
>>
>> Best,
>> Vino
>>
>> Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 下午5:07写道:
>>
>>> Thank you @vino yang <yanghua1...@gmail.com>  for the reply. I suspect
>>> keyBy will beneficial in those cases where my subsequent operators are
>>> computationally intensive. Their computation time being > than network
>>> reshuffling cost.
>>>
>>> Regards,
>>> Komal
>>>
>>> On Mon, 9 Dec 2019 at 15:23, vino yang <yanghua1...@gmail.com> wrote:
>>>
>>>> Hi Komal,
>>>>
>>>> KeyBy(Hash Partition, logically partition) and rebalance(physical
>>>> partition) are both one of the partitions been supported by Flink.[1]
>>>>
>>>> Generally speaking, partitioning may cause network
>>>> communication(network shuffles) costs which may cause more time cost. The
>>>> example provided by you may be benefit from operator chain[2] if you remove
>>>> the keyBy operation.
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>> [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
>>>> [2]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>>>>
>>>> Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 上午9:11写道:
>>>>
>>>>> Anyone?
>>>>>
>>>>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam <komal.mar...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello everyone,
>>>>>>
>>>>>> I want to get some insights on the KeyBy (and Rebalance) operations
>>>>>> as according to my understanding they partition our tasks over the 
>>>>>> defined
>>>>>> parallelism and thus should make our pipeline faster.
>>>>>>
>>>>>> I am reading a topic which contains 170,000,000 pre-stored records
>>>>>> with 11 Kafka partitions and replication factor of 1.   Hence I use
>>>>>> .setStartFromEarliest() to read the stream.
>>>>>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10
>>>>>> cores and 1 job manager with 6 cores. (10 task slots per TM hence I set
>>>>>> environment parallelism to 30).
>>>>>>
>>>>>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>>>>>> keeping the number of records fixed to get a handle on how fast they're
>>>>>> being processed.
>>>>>>
>>>>>> When I remove keyBy, I get the same results in 39 secs as opposed to
>>>>>> 52 secs with KeyBy. Infact, even when I vary the parallelism down to 10 
>>>>>> or
>>>>>> below I still get the same extra overhead of 9 to 13secs. My data is 
>>>>>> mostly
>>>>>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>>>>>> likewise has the same latency as keyBy.
>>>>>>
>>>>>>  What I want to know is what may be causing this overhead? And is
>>>>>> there any way to decrease it?
>>>>>>
>>>>>> Here's the script I'm running for testing purposes:
>>>>>> --------------
>>>>>> DataStream JSONStream  = env.addSource(new
>>>>>> FlinkKafkaConsumer<>("data", new
>>>>>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>>>>>
>>>>>> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());
>>>>>>
>>>>>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>>>>>
>>>>>> public class findDistancefromPOI extends RichFilterFunction<Point> {
>>>>>>     public boolean filter(Point input) throws Exception {
>>>>>>         Double distance = computeEuclideanDist(
>>>>>> 16.4199  , 89.974  ,input.X(),input.Y);
>>>>>>          return distance > 0;
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> Best Regards,
>>>>>> Komal
>>>>>>
>>>>>

Reply via email to