????????????????,????????:

     StreamQueryConfig queryConfig = tabEnv.queryConfig();
        queryConfig.withIdleStateRetentionTime(Time.seconds(20), 
Time.minutes(6));


        DataStream<Student> source = env.socketTextStream("localhost", 10028)
                .map(new MapFunction<String, Student>() {
                    @Override
                    public Student map(String value) throws Exception {
                        String[] vals = value.split(",");
                        if (vals.length < 2) {
                            return null;
                        }
                        Student st = new Student();
                        st.stNo = vals[0];
                        st.name = vals[1];
                        return st;
                    }
                }).returns(Student.class);


        Table table = tabEnv.fromDataStream(source, "stNo, name");


        Table distinctTab = table.groupBy("stNo, name").select("stNo, 
name");//.select("name, name.count as cnt");


        DataStream<Tuple2<Boolean, Student>> distinctStream = 
tabEnv.toRetractStream(distinctTab, Student.class);


        DataStream<Student> distintOutStrem = distinctStream.map(tuple2 -> {
            if (tuple2.f0) {
                return tuple2.f1;
            }
            return null;
        }).filter(Objects::nonNull);


        Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, 
proctime.proctime");


        Table result = 
after.window(Tumble.over("10.seconds").on("proctime").as("w"))
                .groupBy("name, w")
                .select("name, name.count as cnt, w.start as wStart, w.end as 
wEnd, w.proctime as wProctime");


        DataStream<Result> resultStream = tabEnv.toAppendStream(result, 
Result.class);
        resultStream.print();
        env.execute(TestState.class.getSimpleName());



??????????????????????,????????????????????jvm??????????,????dump????????org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
 
????????????????,????????????????????????????TimerHeapInternalTimer????????????????????????????,??????????????????????
 num     #instances         #bytes  class name
----------------------------------------------
   1:          5937       44249552  [B
   2:        214238       18291832  [C
   3:        141199        5647960  
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
   4:        213521        5124504  java.lang.String
   5:        118727        4397272  [Ljava.lang.Object;
   6:        108138        3460416  java.util.HashMap$Node
   7:         19440        1667688  [Ljava.util.HashMap$Node;
   8:         94253        1508048  org.apache.flink.types.Row
   9:         47066        1506112  
org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
  10:         12924        1426104  java.lang.Class
  11:            49        1229592  
[Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
  12:         48072        1153728  java.lang.Long
  13:         34657        1109024  java.util.concurrent.ConcurrentHashMap$Node
  14:          7772        1078360  [I
  15:         26591        1063640  java.util.LinkedHashMap$Entry
  16:         15301         856856  java.util.LinkedHashMap
  17:         11771         847512  java.lang.reflect.Field
  18:         13172         843008  java.nio.DirectByteBuffer
  19:          8570         754160  java.lang.reflect.Method
  20:            20         655680  [Lscala.concurrent.forkjoin.ForkJoinTask;
  21:         13402         643296  java.util.HashMap
  22:         12945         621360  
org.apache.flink.core.memory.HybridMemorySegment
  23:         13275         531000  sun.misc.Cleaner
  24:         15840         506880  com.esotericsoftware.kryo.Registration
  25:           393         450928  [Ljava.nio.ByteBuffer;
  26:         13166         421312  java.nio.DirectByteBuffer$Deallocator
  27:         25852         413632  java.lang.Object
  28:         14137         339288  java.util.ArrayList
  29:          6410         307680  
org.apache.kafka.common.metrics.stats.SampledStat$Sample
  30:          4572         292608  
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
  31:           392         288576  
[Ljava.util.concurrent.ConcurrentHashMap$Node;
  32:          8412         269184  org.apache.kafka.common.MetricName
  33:          8412         269184  org.apache.kafka.common.metrics.KafkaMetric
  34:            72         268704  
[Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
  35:         10070         241680  
org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion
  36:          9828         225040  [Ljava.lang.Class;
  37:          9360         224640  
com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry
  38:          7905         189720  org.apache.flink.api.java.tuple.Tuple2
  39:          2358         150912  org.apache.kafka.common.metrics.Sensor
  40:          1855         148400  java.lang.reflect.Constructor
  41:          1464         143936  [J
  42:          8764         140224  java.util.LinkedHashMap$LinkedEntrySet
  43:          1668         133440  
com.esotericsoftware.kryo.serializers.FieldSerializer

回复