Hi Akshay,

I don’t know much about the Beam/Flink integration, but I’m curious why you 
have a single record that would contain all 8M records with the same key.

E.g. the code for your simple “group by” test would be interesting.

— Ken


> On Nov 22, 2018, at 10:54 AM, Akshay Mendole <akshaymend...@gmail.com> wrote:
> 
> Hi,
>     Thanks for your reply. I tried running a simple "group by" on just one 
> dataset where few keys are repeatedly occurring (in order of millions)  and 
> did not include any joins. I wanted to see if this issue is specific to join. 
> But as I was expecting, I ran into the same issue. I am giving 7GBs to each 
> task manager with 2 slots per task manager. From what I understood so far, 
> such cases where individual records somewhere in the pipeline become so large 
> that they should be handled in distributed manner instead of handling them by 
> a simple data structure in single JVM. I am guessing there is no way to do 
> this in Flink today. 
> Could you please confirm this?
> Thanks,
> Akshay
> 
> 
> On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann <trohrm...@apache.org 
> <mailto:trohrm...@apache.org>> wrote:
> Hi Akshay,
> 
> Flink currently does not support to automatically distribute hot keys across 
> different JVMs. What you can do is to adapt the parallelism/number of 
> partitions manually if you encounter that one partition contains a lot of hot 
> keys. This might mitigate the problem by partitioning the hot keys into 
> different partitions.
> 
> Apart from that, the problem seems to be as Zhijiang indicated that your join 
> result is quite large. One record is 1 GB large. Try to decrease it or give 
> more memory to your TMs.
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole <akshaymend...@gmail.com 
> <mailto:akshaymend...@gmail.com>> wrote:
> Hi Zhijiang,
>                  Thanks for the quick reply. My concern is more towards how 
> flink perform joins of two skewed datasets. Pig 
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> and spark 
> <https://wiki.apache.org/pig/PigSkewedJoinSpec> seems to support the join of 
> skewed datasets. The record size that you are mentioning about in your reply 
> is after join operation takes place which is definitely going to be huge 
> enough not to fit in jvm task manager task slot in my use case. We want to 
> know if there is a way in flink to handle such skewed keys by distributing 
> their values across different jvms. Let me know if you need more clarity on 
> the issue.
> Thanks, 
> Akshay 
> 
> On Thu, Nov 22, 2018 at 2:38 PM zhijiang <wangzhijiang...@aliyun.com 
> <mailto:wangzhijiang...@aliyun.com>> wrote:
> Hi Akshay,
> 
> You encountered an existing issue for serializing large records to cause OOM.
> 
> Every subpartition would create a separate serializer before, and each 
> serializer would maintain an internal bytes array for storing intermediate 
> serialization results. The key point is that these overhead internal bytes 
> array are not managed by framework, and their size would exceed with the 
> record size dynamically. If your job has many subpartitions with large 
> records, it may probably cause OOM issue.
> 
> I already improved this issue to some extent by sharing only one serializer 
> for all subpartitions [1], that means we only have one bytes array overhead 
> at most. This issue is covered in release-1.7.
> Currently the best option may reduce your record size if possible or you can 
> increase the heap size of task manager container.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-9913 
> <https://issues.apache.org/jira/browse/FLINK-9913>
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Akshay Mendole <akshaymend...@gmail.com <mailto:akshaymend...@gmail.com>>
> 发送时间:2018年11月22日(星期四) 13:43
> 收件人:user <user@flink.apache.org <mailto:user@flink.apache.org>>
> 主 题:OutOfMemoryError while doing join operation in flink
> 
> Hi,
>     We are converting one of our pig pipelines to flink using apache beam. 
> The pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches 
> them, joins them and dumps back to hdfs. The data set R1 is skewed. In a 
> sense, it has few keys with lot of records. When we converted the pig 
> pipeline to apache beam and ran it using flink on a production yarn cluster, 
> we got the following error 
> 
> 2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask    
>               - Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) 
> (25/100)
> java.lang.RuntimeException: Emitting the record caused an I/O exception: 
> Failed to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM 
> heap space
>         at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
>         at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>         at 
> org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
>         at 
> org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
>         at 
> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
>         at 
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to serialize element. Serialized size 
> (> 1136656562 bytes) exceeds JVM heap space
>         at 
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
>         at 
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>         at 
> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>         at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>         at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>         at 
> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>         at 
> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at 
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>         at 
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>         at 
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at 
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>         at 
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>         at 
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>         at 
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>         at 
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>         at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>         at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>         at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>         at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>         at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>         at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>         at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>         at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>         at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>         ... 9 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
>         at 
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>         at 
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
>         at 
> org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
>         at java.io.DataOutputStream.write(DataOutputStream.java:107)
>         at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>         at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>         at 
> java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
>         at 
> java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
>         at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
>         at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at 
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
>         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
>         at 
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
>         at 
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at 
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
>         at 
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
>         at 
> org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
>         at 
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
>         at 
> org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
>         at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
>         at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
>         at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
>         at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
>         at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
>         at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:83)
>         at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>         at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
>         at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
>         at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>         at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> 
> 
> From the exception view in flink job manager dashboard, we could see that 
> this is happening at a join operation. 
> When I say R1 dataset is skewed, there are some keys with number of 
> occurrences as high as 8,000,000 , while most of the keys occur just once.
> Dataset R2 has records with keys occurring at most once.
> Also, if we exclude such keys which has high number of occurrences, the 
> pipeline runs absolutely fine which proves it is happening due these few keys 
> only.
> 
> Hadoop version : 2.7.1
> Beam verision : 2.8.0
> Flink Runner version : 2.8.0
> 
> Let me know what more information should I fetch and post here in order for 
> you to help me resolve this.
> 
> Thanks,
> Akshay
> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to