????????????????,????????:
StreamQueryConfig queryConfig = tabEnv.queryConfig();
queryConfig.withIdleStateRetentionTime(Time.seconds(20),
Time.minutes(6));
DataStream<Student> source = env.socketTextStream("localhost", 10028)
.map(new MapFunction<String, Student>() {
@Override
public Student map(String value) throws Exception {
String[] vals = value.split(",");
if (vals.length < 2) {
return null;
}
Student st = new Student();
st.stNo = vals[0];
st.name = vals[1];
return st;
}
}).returns(Student.class);
Table table = tabEnv.fromDataStream(source, "stNo, name");
Table distinctTab = table.groupBy("stNo, name").select("stNo,
name");//.select("name, name.count as cnt");
DataStream<Tuple2<Boolean, Student>> distinctStream =
tabEnv.toRetractStream(distinctTab, Student.class);
DataStream<Student> distintOutStrem = distinctStream.map(tuple2 -> {
if (tuple2.f0) {
return tuple2.f1;
}
return null;
}).filter(Objects::nonNull);
Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name,
proctime.proctime");
Table result =
after.window(Tumble.over("10.seconds").on("proctime").as("w"))
.groupBy("name, w")
.select("name, name.count as cnt, w.start as wStart, w.end as
wEnd, w.proctime as wProctime");
DataStream<Result> resultStream = tabEnv.toAppendStream(result,
Result.class);
resultStream.print();
env.execute(TestState.class.getSimpleName());
??????????????????????,????????????????????jvm??????????,????dump????????org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
????????????????,????????????????????????????TimerHeapInternalTimer????????????????????????????,??????????????????????
num #instances #bytes class name
----------------------------------------------
1: 5937 44249552 [B
2: 214238 18291832 [C
3: 141199 5647960
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
4: 213521 5124504 java.lang.String
5: 118727 4397272 [Ljava.lang.Object;
6: 108138 3460416 java.util.HashMap$Node
7: 19440 1667688 [Ljava.util.HashMap$Node;
8: 94253 1508048 org.apache.flink.types.Row
9: 47066 1506112
org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
10: 12924 1426104 java.lang.Class
11: 49 1229592
[Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
12: 48072 1153728 java.lang.Long
13: 34657 1109024 java.util.concurrent.ConcurrentHashMap$Node
14: 7772 1078360 [I
15: 26591 1063640 java.util.LinkedHashMap$Entry
16: 15301 856856 java.util.LinkedHashMap
17: 11771 847512 java.lang.reflect.Field
18: 13172 843008 java.nio.DirectByteBuffer
19: 8570 754160 java.lang.reflect.Method
20: 20 655680 [Lscala.concurrent.forkjoin.ForkJoinTask;
21: 13402 643296 java.util.HashMap
22: 12945 621360
org.apache.flink.core.memory.HybridMemorySegment
23: 13275 531000 sun.misc.Cleaner
24: 15840 506880 com.esotericsoftware.kryo.Registration
25: 393 450928 [Ljava.nio.ByteBuffer;
26: 13166 421312 java.nio.DirectByteBuffer$Deallocator
27: 25852 413632 java.lang.Object
28: 14137 339288 java.util.ArrayList
29: 6410 307680
org.apache.kafka.common.metrics.stats.SampledStat$Sample
30: 4572 292608
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
31: 392 288576
[Ljava.util.concurrent.ConcurrentHashMap$Node;
32: 8412 269184 org.apache.kafka.common.MetricName
33: 8412 269184 org.apache.kafka.common.metrics.KafkaMetric
34: 72 268704
[Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
35: 10070 241680
org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion
36: 9828 225040 [Ljava.lang.Class;
37: 9360 224640
com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry
38: 7905 189720 org.apache.flink.api.java.tuple.Tuple2
39: 2358 150912 org.apache.kafka.common.metrics.Sensor
40: 1855 148400 java.lang.reflect.Constructor
41: 1464 143936 [J
42: 8764 140224 java.util.LinkedHashMap$LinkedEntrySet
43: 1668 133440
com.esotericsoftware.kryo.serializers.FieldSerializer