I guess I could implement a solution which is not static and extends
the OneInputStreamOperator Flink operator.
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedCombinerByKeySkewedDAG.java#L84

Best,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Thu, Apr 11, 2019 at 2:21 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> thanks All for your suggestions!
>
> I am not sure if the option 3 that Fabian said I will need to change the
> Flink source code or it can be implemented on top of Flink.
> -------------------------
> 3) One approach to improve the processing of skewed data, is to change how
> keyed state is handled.
> Flink's keyed state is partitioned in two steps:
> 1. each key is assigned to a key group based on an internal hash function.
> 2. each key group is assigned to and processed by a parallel operator task.
> For full control over data placement, you need to control both.
> Changing 1) is tricky because it affects savepoint compatibility.
> Changing 2) does not help if two hot keys are assigned to the same keyed
> state.
> -------------------------
> I did an experiment
> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeySkewedDAG.java#L66>
> with a Mapper function
> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/udfs/StationPlatformSkewedKeyMapper.java>
> that maps to a key with one more parameter (a skew parameter). The results
> are better.
>
> Integer skewParameter = 0;
> if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3)))
> { // this is the skewed key
> skewParameter = this.skewParameterGenerator.getNextItem();
> }
> CompositeSkewedKeyStationPlatform compositeKey = new
> CompositeSkewedKeyStationPlatform(stationId, platformId, skewParameter);
>
> But it is still a static solution =(. I mean, the developer has to set on
> the Mapper which key is skewed.
>
> Best,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Thu, Apr 11, 2019 at 1:49 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Just a small addition:
>>
>> If two hot keys fall into two key groups which are being processed by the
>> same TM, then it could help to change the parallelism, because then the key
>> group mapping might be different.
>>
>> If two hot keys fall into the same key group, you can adjust the max
>> parallelism which defines how many key groups will be used. By changing the
>> number, it might happen that the two hot keys fall into different key
>> groups.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 11, 2019 at 9:22 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Felipe,
>>>
>>> three comments:
>>>
>>> 1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no
>>> effect:
>>> keyBy() introduces a hash partitioning such that any data partitioning
>>> that you do immediately before keyBy() is destroyed.
>>> You only change the distribution for the call of the key extractor which
>>> should be a lightweight function anyway.
>>> That's why you do not see any difference between the three methods.
>>>
>>> 2) windowAll() defines a non-keyed window over the whole stream.
>>> All records are processed by the same non-parallel instance of the
>>> window operator.
>>> That's why assigning a higher parallelism to that operator does not help.
>>>
>>> 3) One approach to improve the processing of skewed data, is to change
>>> how keyed state is handled.
>>> Flink's keyed state is partitioned in two steps:
>>> 1. each key is assigned to a key group based on an internal hash
>>> function.
>>> 2. each key group is assigned to and processed by a parallel operator
>>> task.
>>> For full control over data placement, you need to control both.
>>> Changing 1) is tricky because it affects savepoint compatibility.
>>> Changing 2) does not help if two hot keys are assigned to the same keyed
>>> state.
>>>
>>> Best, Fabian
>>>
>>> Am Mi., 10. Apr. 2019 um 11:50 Uhr schrieb Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> I am studying data skew processing in Flink and how I can change the
>>>> low-level control of physical partition (
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning)
>>>> in order to have an even processing of tuples. I have created synthetic
>>>> skewed data sources and I aim to process (aggregate) them over a window.
>>>> Here is the complete code:
>>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedPartitionByKeyDAG.java#L61
>>>>
>>>> streamTrainsStation01.union(streamTrainsStation02)
>>>> .union(streamTicketsStation01).union(streamTicketsStation02)
>>>> // map the keys
>>>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
>>>> .rebalance() // or .rescale() .shuffle()
>>>> .keyBy(new StationPlatformKeySelector())
>>>> .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
>>>> .apply(new
>>>> StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
>>>> .setParallelism(4)
>>>> .map(new
>>>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
>>>> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
>>>> topic)).name(metricSinkFunction)
>>>> ;
>>>>
>>>> According to the Flink dashboard I could not see too much difference
>>>> among .shuffle(), .rescale(), and .rebalance(). Even though the
>>>> documentation says rebalance() transformation is more suitable for data
>>>> skew.
>>>>
>>>> After that I tried to use .partitionCustom(partitioner, "someKey").
>>>> However, for my surprise, I could not use setParallelism(4) on the window
>>>> operation. The documentation says "Note: This operation is inherently
>>>> non-parallel since all elements have to pass through the same operator
>>>> instance.". I did not understand why. If I am allowed to do partitionCustom
>>>> why can't I use parallelism after that?
>>>> Here is the complete code:
>>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MqttSensorDataSkewedRescaleByKeyDAG.java#L74
>>>>
>>>> streamTrainsStation01.union(streamTrainsStation02)
>>>> .union(streamTicketsStation01).union(streamTicketsStation02)
>>>> // map the keys
>>>> .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
>>>> .partitionCustom(new StationPlatformKeyCustomPartitioner(), new
>>>> StationPlatformKeySelector())
>>>> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
>>>> .apply(new
>>>> StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
>>>> .map(new
>>>> StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
>>>> .addSink(new MqttStationPlatformPublisher(ipAddressSink,
>>>> topic)).name(metricSinkFunction)
>>>> ;
>>>>
>>>> Thanks,
>>>> Felipe
>>>>
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>

Reply via email to