Re: Read Hive table in Stream Mode use distinct cause heap OOM

2021-04-26 Thread Shengkai Fang
Hi, could you tell me which version do you use? I just want to check
whether there are any problems.

Best,
Shengkai

张颖  于2021年4月25日周日 下午5:23写道:

> hi,I met an appearance like this:
>
> this is my sql:
> SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat
> FROM app.app_ranking_feature_table_clk_ord_hp_new_all_tree_orc where
> dt='2021-04-01'
>
>
> When I useBlinkPlanner inBatchMode, It works well; But if I set
> inStreamMode,
> It cause a heap OOM.
>
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer(
> DataOutputSerializer.java:85)
> at org.apache.flink.contrib.streaming.state.
> RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(
> RocksDBSerializedCompositeKeyBuilder.java:113)
> at org.apache.flink.contrib.streaming.state.AbstractRocksDBState
> .serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
> at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(
> RocksDBValueState.java:83)
> at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction
> .processElement(GroupAggFunction.java:129)
> at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction
> .processElement(GroupAggFunction.java:43)
> at org.apache.flink.streaming.api.operators.KeyedProcessOperator
> .processElement(KeyedProcessOperator.java:83)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:191)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:204)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:174)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:399)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$221/
> 285424866.runDefaultAction(Unknown Source)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:191)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:620)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:584)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636)
> at java.lang.Thread.run(Thread.java:748)
>
>
> I use the rocksdb, and I confirm it works,then I jmap the tm:
> num #instances #bytes  class name
> --
>1:214656 4420569368  [C
>2:99 2376771576  [B
>3:1379047722624
> org.apache.flink.core.memory.HybridMemorySegment
>4:2145395148936  java.lang.String
>5: 317962635104  [Ljava.lang.Object;
>6:1051332523192
> [Lorg.apache.flink.core.memory.MemorySegment;
>7:1051152522760
> org.apache.flink.table.data.binary.BinarySection
>8:1051152522760
> org.apache.flink.table.data.binary.BinaryStringData
>9: 328122099968  java.nio.DirectByteBuffer
>   10: 148381651560  java.lang.Class
>   11: 500021600064
> java.util.concurrent.ConcurrentHashMap$Node
>   12: 430141376448  java.util.Hashtable$Entry
>   13: 328051312200  sun.misc.Cleaner
>
>
> It looks like the data is in heap rather than in rocksdb, Is there any way
> to set the data to the rocksdb?
>
>
>
>
>


Read Hive table in Stream Mode use distinct cause heap OOM

2021-04-25 Thread 张颖
hi,I met an appearance like this:


this is my sql:
SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat FROM 
app.app_ranking_feature_table_clk_ord_hp_new_all_tree_orc where dt='2021-04-01'




When I useBlinkPlanner inBatchMode, It works well; But if I set inStreamMode,
It cause a heap OOM.


Causedby: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at 
org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer(DataOutputSerializer.java:85)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:113)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:129)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:399)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$221/285424866.runDefaultAction(UnknownSource)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:620)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:584)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636)
at java.lang.Thread.run(Thread.java:748)



I use the rocksdb, and I confirm it works,then I jmap the tm:
num #instances #bytes  class name
--
   1:214656 4420569368  [C
   2:99 2376771576  [B
   3:1379047722624  
org.apache.flink.core.memory.HybridMemorySegment
   4:2145395148936  java.lang.String
   5: 317962635104  [Ljava.lang.Object;
   6:1051332523192  
[Lorg.apache.flink.core.memory.MemorySegment;
   7:1051152522760  
org.apache.flink.table.data.binary.BinarySection
   8:1051152522760  
org.apache.flink.table.data.binary.BinaryStringData
   9: 328122099968  java.nio.DirectByteBuffer
  10: 148381651560  java.lang.Class
  11: 500021600064  java.util.concurrent.ConcurrentHashMap$Node
  12: 430141376448  java.util.Hashtable$Entry
  13: 328051312200  sun.misc.Cleaner



It looks like the data is in heap rather than in rocksdb, Is there any way to 
set the data to the rocksdb?