thanks All for your suggestions!

I am not sure if the option 3 that Fabian said I will need to change the
Flink source code or it can be implemented on top of Flink.
-------------------------
3) One approach to improve the processing of skewed data, is to change how
keyed state is handled.
Flink's keyed state is partitioned in two steps:
1. each key is assigned to a key group based on an internal hash function.
2. each key group is assigned to and processed by a parallel operator task.
For full control over data placement, you need to control both.
Changing 1) is tricky because it affects savepoint compatibility.
Changing 2) does not help if two hot keys are assigned to the same keyed
state.
-------------------------
I did an experiment
<https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeySkewedDAG.java#L66>
with a Mapper function
<https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/udfs/StationPlatformSkewedKeyMapper.java>
that maps to a key with one more parameter (a skew parameter). The results
are better.

Integer skewParameter = 0;
if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3)))
{ // this is the skewed key
skewParameter = this.skewParameterGenerator.getNextItem();
}
CompositeSkewedKeyStationPlatform compositeKey = new
CompositeSkewedKeyStationPlatform(stationId, platformId, skewParameter);

But it is still a static solution =(. I mean, the developer has to set on
the Mapper which key is skewed.

Best,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Thu, Apr 11, 2019 at 1:49 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Just a small addition:
>
> If two hot keys fall into two key groups which are being processed by the
> same TM, then it could help to change the parallelism, because then the key
> group mapping might be different.
>
> If two hot keys fall into the same key group, you can adjust the max
> parallelism which defines how many key groups will be used. By changing the
> number, it might happen that the two hot keys fall into different key
> groups.
>
> Cheers,
> Till
>
> On Thu, Apr 11, 2019 at 9:22 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Felipe,
>>
>> three comments:
>>
>> 1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no
>> effect:
>> keyBy() introduces a hash partitioning such that any data partitioning
>> that you do immediately before keyBy() is destroyed.
>> You only change the distribution for the call of the key extractor which
>> should be a lightweight function anyway.
>> That's why you do not see any difference between the three methods.
>>
>> 2) windowAll() defines a non-keyed window over the whole stream.
>> All records are processed by the same non-parallel instance of the window
>> operator.
>> That's why assigning a higher parallelism to that operator does not help.
>>
>> 3) One approach to improve the processing of skewed data, is to change
>> how keyed state is handled.
>> Flink's keyed state is partitioned in two steps:
>> 1. each key is assigned to a key group based on an internal hash
>> function.
>> 2. each key group is assigned to and processed by a parallel operator
>> task.
>> For full control over data placement, you need to control both.
>> Changing 1) is tricky because it affects savepoint compatibility.
>> Changing 2) does not help if two hot keys are assigned to the same keyed
>> state.
>>
>> Best, Fabian
>>
>> Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com>:
>>
>>> Hi,
>>>
>>> I am studying data skew processing in Flink and how I can change the
>>> low-level control of physical partition (
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning)
>>> in order to have an even processing of tuples. I have created synthetic
>>> skewed data sources and I aim to process (aggregate) them over a window.
>>> Here is the complete code:
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61
>>>
>>> streamTrainsStation01.union(streamTrainsStation02)
>>> .union(streamTicketsStation01).union(streamTicketsStation02)
>>> // map the keys
>>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
>>> .rebalance() // or .rescale() .shuffle()
>>> .keyBy(new StationPlatformKeySelector())
>>> .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
>>> .apply(new
>>> StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
>>> .setParallelism(4)
>>> .map(new
>>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
>>> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
>>> topic)).name(metricSinkFunction)
>>> ;
>>>
>>> According to the Flink dashboard I could not see too much difference
>>> among .shuffle(), .rescale(), and .rebalance(). Even though the
>>> documentation says rebalance() transformation is more suitable for data
>>> skew.
>>>
>>> After that I tried to use .partitionCustom(partitioner, "someKey").
>>> However, for my surprise, I could not use setParallelism(4) on the window
>>> operation. The documentation says "Note: This operation is inherently
>>> non-parallel since all elements have to pass through the same operator
>>> instance.". I did not understand why. If I am allowed to do partitionCustom
>>> why can't I use parallelism after that?
>>> Here is the complete code:
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedRescaleByKeyDAG.java#L74
>>>
>>> streamTrainsStation01.union(streamTrainsStation02)
>>> .union(streamTicketsStation01).union(streamTicketsStation02)
>>> // map the keys
>>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
>>> .partitionCustom(new StationPlatformKeyCustomPartitioner(), new
>>> StationPlatformKeySelector())
>>> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
>>> .apply(new
>>> StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
>>> .map(new
>>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
>>> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
>>> topic)).name(metricSinkFunction)
>>> ;
>>>
>>> Thanks,
>>> Felipe
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>

Reply via email to