?????? ??????????????checkpoint??????????????????????????
continue trigger ---- ??: "user-zh"
??????????????checkpoint??????????????????????????
checkpointflink1.3/1.4+sql??checkpoint1??rocksdb15??30?? ??flink 1.3/1.4 ??state.backend.rocksdb.timer-service.factory ??rocksdb?? rocksdb?? ??
????????????????UDF????????????
??HBASE??kafkauserId??OK ---- ??: "user-zh"
?????? sql ????josn?????????? ????????????
---- ??: "user-zh" https://issues.apache.org/jira/browse/FLINK-18590 Benchao Li
sql ????josn?????????? ????????????
hi alljson(ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),)?? create table hiido_push_sdk_mq ( datas ARRAY
????jdbc connector????????
hi allsqlclickhouse??https://github.com/apache/flink/blob/d04872d2c6b7570ea3ba02f8fc4fca02daa96118/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java#L30, ??DerbyDialect??MySQLDialect??PostgresDialect??jdbcSQL??
?????? ??????savepoint????????????????????
UID??name ---- ??:"Sun.Zhu"<17626017...@163.com; :2020??6??23??(??) 5:18 ??:"user-zh@flink.apache.org"https://issues.apache.org/jira/browse/FLINK-5601 <https://issues.apache.org/jira/browse/FLINK-5601?gt; Best, Congxian claylin <1012539...@qq.comgt; ??2020??6??23?? 2:44?? gt; ??watermark?? gt; gt; gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??:amp;nbsp;"Congxian Qiu"
?????? ??????savepoint????????????????????
??savepoint?? flatmap??jobgraph ??flatmap??Rebalance?? ---- ??:"Congxian Qiu"https://issues.apache.org/jira/browse/FLINK-5601 <https://issues.apache.org/jira/browse/FLINK-5601?; Best, Congxian claylin <1012539...@qq.com ??2020??6??23?? 2:44?? ??watermark?? --nbsp;nbsp;-- ??:nbsp;"Congxian Qiu"
?????? ??????savepoint????????????????????
??watermark?? ---- ??:"Congxian Qiu"
?????? ??????savepoint????????????????????
1. savepoint??savepoint 2. ??window??tumble event time window 3. eventtimerecord ---- ??:"Congxian Qiu"
??????savepoint????????????????????
hi all??savepointevent time
回复:kafka connector从指定timestamp开始消费
目前版本不支持,我看1.11版本支持,其实可以自己修改支持 ---原始邮件--- 发件人: "Kyle Zhang"
?????? Flink SQL ???? nested Json ????
?? ?? ---- ??:"Benchao Li"
?????? Flink SQL ???? nested Json ????
??rowevent time create table my_source ( database varchar, maxwell_ts bigint, table varchar, data row< transaction_sn varchar, parent_id int, user_id int, amount int, reference_id varchar, status int, transaction_type int, merchant_id int, update_time int, create_time int ts AS CAST(FROM_UNIXTIME(create_time) AS TIMESTAMP(3)), // WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE ) with ( ... ) ---- ??:"Benchao Li"
?????? ?????? ????sql????????????idle??????????????????????????????
1.10??kafka??2-5M/s ---- ??:"LakeShen"https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time Best, LakeShen claylin <1012539...@qq.com ??2020??5??17?? 10:24?? ??10-15??key??kafka2-5M --nbsp;nbsp;-- ??:nbsp;"??"https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4 gt <https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4gt;; gt; gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??:amp;nbsp;"tison"
?????? ?????? ????sql????????????idle??????????????????????????????
??10-15??key??kafka2-5M ---- ??:"??"https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4 --nbsp;nbsp;-- ??:nbsp;"tison"
?????? ????sql????????????idle??????????????????????????????
https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4 ---- ??:"tison"
????sql????????????idle??????????????????????????????
sqlTableConfigidle timesst??flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9state??CREATE TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT, appId STRING, deviceId STRING, appVer STRING, dnsDur BIGINT, useGlb INT, hitCache INT, requestSize DOUBLE, responseSize DOUBLE, totalDur BIGINT, url STRING, statusCode INT, prototype STRING, netType STRING, traceId STRING, ts AS CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)), WATERMARK FOR ts AS ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', 'connector.properties.group.id' = 'interface_success_rate_consumer', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); create table request_latency_tbl ( app_id string, app_ver string, net_type string, prototype string, url string, status_code int, w_start string, success_cnt BIGINT, failure_cnt BIGINT, total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=truecharacterEncoding=utf-8zeroDateTimeBehavior=convertToNullautoReconnect=true', 'connector.table' = 'request_latency_statistics', 'connector.username' = 'yapm_metrics', 'connector.password' = '1234456', 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval' = '5s', 'connector.write.max-retries' = '2' ); create view request_1minutes_latency as select appId, appVer, netType, prototype, url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start, count(distinct traceId) filter (where statusCode in (200)) as successCnt, count(distinct traceId) filter (where statusCode not in (200)) as failureCnt, count(distinct traceId) as total_cnt from yy_yapmnetwork_original group by appId, appVer, netType, prototype, url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into request_latency_tbl select * from request_1minutes_latency;
flink sql ????time window join ????
hi all??flink sqlwindow join
flink sql ????time window join ????
hi all??flink sqlwindow join
????Java????????????????
hi all??Java?? https://s2.ax1x.com/2020/03/03/34yyvT.png https://s2.ax1x.com/2020/03/03/34y5P1.png ??
?????? ??????????????????????????????????????rocksDB
1.10.0?? ??tps2rocksdb??44Gtps??rocksdb??io??rocksdb??rocksdb:low??rocksdb:high??2128?? ---- ??:"Yu Li"https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#memory-management [2] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide On Wed, 26 Feb 2020 at 23:27, claylin <1012539...@qq.com wrote: Hi rocksDB??tps??10w??rocksDBsar -dp ??io?? Average:nbsp; nbsp; nbsp; nbsp; nbsp; DEVnbsp; nbsp; nbsp; nbsp;tpsnbsp; rd_sec/snbsp; wr_sec/snbsp; avgrq-sznbsp; avgqu-sznbsp; nbsp; nbsp;awaitnbsp; nbsp; nbsp;svctmnbsp; nbsp; nbsp;%util Average:nbsp; nbsp; nbsp; nbsp; nbsp; sdanbsp; nbsp; 285.36nbsp; nbsp;2152.91nbsp; 88322.99nbsp; nbsp; 317.06nbsp; nbsp; nbsp;21.48nbsp; nbsp; nbsp;75.27nbsp; nbsp; nbsp; 0.58nbsp; nbsp; nbsp;16.60 ?? https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdbManager Memory??rocksDB??flush??compact??state ??
?????? ??????????????????????????????????????rocksDB
??1.10 /data/hadoop/tmp/nm-local-dir/usercache/flink-user/appcache/application_1582163794765_0026/flink-io-21b91014-03a5-4d58-bafc-7e5b73f11ed0??2??43G??44G?? jstack??rocksdbrocksdb??rocksdb:low??rocksdb:high??CPU1tpsio80??rocksdb??128?? ??yarn??node??32128G5??32??5 ---- ??:"Yun Tang"https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-metrics-total-sst-files-size ____ From: claylin <1012539...@qq.com Sent: Wednesday, February 26, 2020 23:27 To: user-zh https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdbManager Memory??rocksDB??flush??compact??state ??
??????????????????????????????????????rocksDB
Hi rocksDB??tps??10w??rocksDBsar -dp ??io?? Average: DEV tps rd_sec/s wr_sec/s avgrq-sz avgqu-sz await svctm %util Average: sda 285.36 2152.91 88322.99 317.06 21.48 75.27 0.58 16.60 ??https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdbManager Memory??rocksDB??flush??compact??state ??
????mq????????????????????????????????
Hi event timekafkatcp(??tcpRecv-Q) org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-108, groupId=push-life-cycle-trace-consumer-for-flink-1.1.0-no-checkpoint] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 213: org.apache.kafka.common.errors.DisconnectException.
?????? ????????????taskmanager.memory.network????
---- ??:"Xintong Song"
????????????taskmanager.memory.network????
taskmanager.memory.networkyarn??yarn sessionflink-conf.yaml??yarn-session
??checkpoint????????????
1.9.1checkpointsst?? 2020-01-16 19:29:39 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:881) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:395) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_8ea7af242b2bcc2d11daf69b5d588c4d_(31/32) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) ... 6 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 8 more Caused by: java.nio.file.NoSuchFileException: /data/hadoop/tmp/nm-local-dir/usercache/www-data/appcache/application_1579002711906_0001/flink-io-bd910c0d-03c7-48ff-8712-4e7059bac574/job_bbe797c8fcdf7c362bed774435ae5f86_op_KeyedProcessOperator_8ea7af242b2bcc2d11daf69b5d588c4d__31_32__uuid_7259cf96-aa16-423e-a356-dcac0a7859f2/db/19.sst - /data/hadoop/tmp/nm-local-dir/usercache/www-data/appcache/application_1579002711906_0001/flink-io-bd910c0d-03c7-48ff-8712-4e7059bac574/job_bbe797c8fcdf7c362bed774435ae5f86_op_KeyedProcessOperator_8ea7af242b2bcc2d11daf69b5d588c4d__31_32__uuid_7259cf96-aa16-423e-a356-dcac0a7859f2/40e6dc65-7fac-41ae-b736-91c4ecd5e296/19.sst at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476) at java.nio.file.Files.createLink(Files.java:1086) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) ... 12 more dfscheckpoint
?????? ??????savepoint???????????? migration for MapState currently isn't supported.
1.9.1?? Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible. ---- ??:"shuwen zhou"https://issues.apache.org/jira/browse/FLINK-11947 Best, Congxian claylin <1012539...@qq.comgt; ??2019??11??14?? 9:35?? gt; ??savepoint??1.8.1?? gt; java.lang.RuntimeException: Error while getting statenbsp;nbsp; at gt; org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) gt;nbsp; at gt; org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) gt;nbsp;nbsp;nbsp;nbsp;nbsp; at gt; com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) gt;nbsp;nbsp;nbsp; at gt; org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; at gt; org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) gt; at gt; org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) gt;nbsp;nbsp;nbsp; at gt; org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) gt;nbsp;nbsp; at gt; org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) gt;nbsp;nbsp;nbsp;nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) gt;nbsp; at java.lang.Thread.run(Thread.java:748) Caused by: gt; org.apache.flink.util.StateMigrationException: The new serializer for a gt; MapState requires state migration in order for the job to proceed. However, gt; migration for MapState currently isn't supported.nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; at gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) gt;nbsp;nbsp;nbsp;nbsp; at gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) gt;nbsp;nbsp;nbsp; at gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) gt;nbsp; at gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) gt;nbsp;nbsp;nbsp; at gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) gt;nbsp; at gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) gt; at gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) gt;nbsp;nbsp;nbsp; at gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) gt;nbsp;nbsp;nbsp;nbsp;nbsp; at gt; org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) gt;nbsp; at gt; org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) gt;nbsp;nbsp;nbsp; at gt; org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) gt;nbsp; at gt; org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) gt;nbsp; ... 9 more -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/;
????????Queryable State ????????????????
?????? ??????savepoint???????????? migration for MapState currently isn't supported.
schema?? ---- ??:"Congxian Qiu"https://issues.apache.org/jira/browse/FLINK-11947 Best, Congxian claylin <1012539...@qq.com ??2019??11??14?? 9:35?? ??savepoint??1.8.1?? java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) at com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) ... 9 more
??????savepoint???????????? migration for MapState currently isn't supported.
??savepoint??1.8.1?? java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) at com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) ... 9 more
????????RocksDBStateBackend ????state.backend.rocksdb.ttl.compaction.filter.enabled ??????????
??RocksDBStateBackend??,state??(),??state.backend.rocksdb.ttl.compaction.filter.enabled,rocksdbcompact??ttl,??state,https://github.com/facebook/rocksdb/wiki/Time-to-Live,??,rocksdbcompact,??compact??,state??flinkttlOnCreateAndWrite??OnReadAndWrite??,,??ttl??1??,??state??ttl,??
????????org.apache.flink.streaming.api.operators.TimerHeapInternalTimer ???????????????? ??????????????????
,: StreamQueryConfig queryConfig = tabEnv.queryConfig(); queryConfig.withIdleStateRetentionTime(Time.seconds(20), Time.minutes(6)); DataStream source = env.socketTextStream("localhost", 10028) .map(new MapFunction() { @Override public Student map(String value) throws Exception { String[] vals = value.split(","); if (vals.length < 2) { return null; } Student st = new Student(); st.stNo = vals[0]; st.name = vals[1]; return st; } }).returns(Student.class); Table table = tabEnv.fromDataStream(source, "stNo, name"); Table distinctTab = table.groupBy("stNo, name").select("stNo, name");//.select("name, name.count as cnt"); DataStream> distinctStream = tabEnv.toRetractStream(distinctTab, Student.class); DataStream distintOutStrem = distinctStream.map(tuple2 -> { if (tuple2.f0) { return tuple2.f1; } return null; }).filter(Objects::nonNull); Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, proctime.proctime"); Table result = after.window(Tumble.over("10.seconds").on("proctime").as("w")) .groupBy("name, w") .select("name, name.count as cnt, w.start as wStart, w.end as wEnd, w.proctime as wProctime"); DataStream resultStream = tabEnv.toAppendStream(result, Result.class); resultStream.print(); env.execute(TestState.class.getSimpleName()); ??,jvm??,dumporg.apache.flink.streaming.api.operators.TimerHeapInternalTimer ,TimerHeapInternalTimer,?? num #instances #bytes class name -- 1: 5937 44249552 [B 2:214238 18291832 [C 3:1411995647960 org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry 4:2135215124504 java.lang.String 5:1187274397272 [Ljava.lang.Object; 6:1081383460416 java.util.HashMap$Node 7: 194401667688 [Ljava.util.HashMap$Node; 8: 942531508048 org.apache.flink.types.Row 9: 470661506112 org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 10: 129241426104 java.lang.Class 11:491229592 [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry; 12: 480721153728 java.lang.Long 13: 346571109024 java.util.concurrent.ConcurrentHashMap$Node 14: 77721078360 [I 15: 265911063640 java.util.LinkedHashMap$Entry 16: 15301 856856 java.util.LinkedHashMap 17: 11771 847512 java.lang.reflect.Field 18: 13172 843008 java.nio.DirectByteBuffer 19: 8570 754160 java.lang.reflect.Method 20:20 655680 [Lscala.concurrent.forkjoin.ForkJoinTask; 21: 13402 643296 java.util.HashMap 22: 12945 621360 org.apache.flink.core.memory.HybridMemorySegment 23: 13275 531000 sun.misc.Cleaner 24: 15840 506880 com.esotericsoftware.kryo.Registration 25: 393 450928 [Ljava.nio.ByteBuffer; 26: 13166 421312 java.nio.DirectByteBuffer$Deallocator 27: 25852 413632 java.lang.Object 28: 14137 339288 java.util.ArrayList 29: 6410 307680 org.apache.kafka.common.metrics.stats.SampledStat$Sample 30: 4572 292608 com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField 31: 392 288576 [Ljava.util.concurrent.ConcurrentHashMap$Node; 32: 8412 269184 org.apache.kafka.common.MetricName 33: 8412 269184 org.apache.kafka.common.metrics.KafkaMetric 34:72 268704 [Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement; 35: 10070 241680 org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion 36: 9828 225040 [Ljava.lang.Class; 37: 9360 224640 com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry 38: 7905 189720 org.apache.flink.api.java.tuple.Tuple2 39: 2358 150912 org.apache.kafka.common.metrics.Sensor 40: 1855 148400 java.lang.reflect.Constructor 41: 1464 143936 [J