yes. It will be very welcome a discussion with who knows better than me.

Basically, I am trying to implement the issue FLINK-1725 [1] that was gave
up on March 2017. Stephan Ewen said that there are more issues to be fixed
before going to this implementation and I don't really know which are them.

[1] https://issues.apache.org/jira/browse/FLINK-1725

Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Mon, Sep 23, 2019 at 3:47 PM Biao Liu <mmyy1...@gmail.com> wrote:

> Wow, that's really cool! There are indeed a lot works you have done. IMO
> it's beyond the scope of user group somewhat.
>
> Just one small concern, I'm not sure I have fully understood your way of
> "tackle data skew by altering the way Flink partition keys using
> KeyedStream".
>
> From my understanding, key-group is used for rescaling job. Like
> supporting reusing state after changing the parallelism of operator.
> I'm not sure whether you are in the right direction or not. It seems that
> you are implementing something deeper than user interface. User interface
> is stable, while implementation is not. Usually it's not recommended to
> support a feature based on implementation.
>
> If you have strong reasons to change the implementation, I would suggest
> to start a discussion in dev mailing list. Maybe it could be supported
> officially. What do you think?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Mon, 23 Sep 2019 at 20:54, Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>>
>> I`ve implemented a combiner [1] in Flink by extending
>> OneInputStreamOperator in Flink. I call my operator using "transform".
>> It works well and I guess it is useful if I import this operator in the
>> DataStream.java. I just need more to check if I need to touch other parts
>> of the source code.
>>
>> But now I want to tackle data skew by altering the way Flink partition
>> keys using KeyedStream.
>>
>> [1]
>> https://felipeogutierrez.blogspot.com/2019/08/implementing-dynamic-combiner-mini.html
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Mon, Sep 23, 2019 at 2:37 PM Biao Liu <mmyy1...@gmail.com> wrote:
>>
>>> Hi Felipe,
>>>
>>> If I understand correctly, you want to solve data skew caused by
>>> imbalanced key?
>>>
>>> There is a common strategy to solve this kind of problem,
>>> pre-aggregation. Like combiner of MapReduce.
>>> But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
>>> afraid you have to implement it by yourself.
>>>
>>> For example, introducing a function caching some data (time or count
>>> based). This function should be before "keyby". And it's on a non-keyed
>>> stream. It does pre-aggregation just like what the aggregation after
>>> "keyby" does. In this way, the skewed keyed data would be reduced a lot.
>>>
>>> I also found a suggestion [1] from Fabian, although it's long time ago.
>>>
>>> Hope it helps.
>>>
>>> 1.
>>> https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
>>>> thanks Biao,
>>>>
>>>> I see. To achieve what I want to do I need to work with KeyedStream. I
>>>> downloaded the Flink source code to learn and alter the KeyedStream to my
>>>> needs. I am not sure but it is a lot of work because as far as I understood
>>>> the key-groups have to be predictable [1]. and altering this touches a lot
>>>> of other parts of the source code.
>>>>
>>>> However, If I guarantee that they (key-groups) are predictable, I will
>>>> be able to rebalance, rescale, .... the keys to other worker-nodes.
>>>>
>>>> [1]
>>>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>>>>
>>>> Thanks,
>>>> Felipe
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>>
>>>> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mmyy1...@gmail.com> wrote:
>>>>
>>>>> Hi Felipe,
>>>>>
>>>>> Flink job graph is DAG based. It seems that you set an "edge property"
>>>>> (partitioner) several times.
>>>>> Flink does not support multiple partitioners on one edge. The later
>>>>> one overrides the priors. That means the "keyBy" overrides the "rebalance"
>>>>> and "partitionByPartial".
>>>>>
>>>>> You could insert some nodes between these partitioners to satisfy your
>>>>> requirement. For example,
>>>>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>>>>>
>>>>> Thanks,
>>>>> Biao /'bɪ.aʊ/
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>
>>>>>> I am executing a data stream application which uses rebalance.
>>>>>> Basically I am counting words using "src -> split ->
>>>>>> physicalPartitionStrategy -> keyBy -> sum -> print". I am running 3
>>>>>> examples, one without physical partition strategy, one with rebalance
>>>>>> strategy [1], and one with partial partition strategy from [2].
>>>>>> I know that the keyBy operator actually kills what rebalance is doing
>>>>>> because it splits the stream by key and if I have a stream with skewed 
>>>>>> key,
>>>>>> one parallel instance of the operator after the keyBy will be overloaded.
>>>>>> However, I was expecting that *before the keyBy* I would have a
>>>>>> balanced stream, which is not happening.
>>>>>>
>>>>>> Basically, I want to see the difference in records/sec between
>>>>>> operators when I use rebalance or any other physical partition strategy.
>>>>>> However, when I found no difference in the records/sec metrics of all
>>>>>> operators when I am running 3 different physical partition strategies.
>>>>>> Screenshots of Prometheus+Grafana are attached.
>>>>>>
>>>>>> Maybe I am measuring the wrong operator, or maybe I am not using the
>>>>>> rebalance in the right way, or I am not doing a good use case to test the
>>>>>> rebalance transformation.
>>>>>> I am also testing a different physical partition to later try to
>>>>>> implement the issue "FLINK-1725 New Partitioner for better load balancing
>>>>>> for skewed data" [2]. I am not sure, but I guess that all physical
>>>>>> partition strategies have to be implemented on a KeyedStream.
>>>>>>
>>>>>> DataStream<String> text = env.addSource(new WordSource());
>>>>>> // split lines in strings
>>>>>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
>>>>>> Tokenizer());
>>>>>> // choose a partitioning strategy
>>>>>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
>>>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>>>> tokenizer.rebalance();
>>>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>>>> tokenizer.partitionByPartial(0);
>>>>>> // count
>>>>>> partitionedStream.keyBy(0).sum(1).print();
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-1725
>>>>>>
>>>>>> thanks,
>>>>>> Felipe
>>>>>>
>>>>>> *--*
>>>>>> *-- Felipe Gutierrez*
>>>>>>
>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>
>>>>>

Reply via email to