?????? ??????????????checkpoint??????????????????????????

2021-10-31 文章 claylin
continue trigger




----
??: 
   "user-zh"



??????????????checkpoint??????????????????????????

2021-10-28 文章 claylin
checkpointflink1.3/1.4+sql??checkpoint1??rocksdb15??30??
??flink 1.3/1.4 
??state.backend.rocksdb.timer-service.factory 
??rocksdb?? rocksdb??
??


????????????????UDF????????????

2021-09-23 文章 claylin
??HBASE??kafkauserId??OK


----
??: 
   "user-zh"



?????? sql ????josn?????????? ????????????

2020-07-17 文章 claylin





----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-18590

Benchao Li 

sql ????josn?????????? ????????????

2020-07-17 文章 claylin
hi alljson(ts AS 
CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),)??
create table hiido_push_sdk_mq (
datas ARRAY

????jdbc connector????????

2020-07-06 文章 claylin
hi 
allsqlclickhouse??https://github.com/apache/flink/blob/d04872d2c6b7570ea3ba02f8fc4fca02daa96118/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java#L30,
 
??DerbyDialect??MySQLDialect??PostgresDialect??jdbcSQL??

?????? ??????savepoint????????????????????

2020-06-23 文章 claylin
UID??name




----
??:"Sun.Zhu"<17626017...@163.com;
:2020??6??23??(??) 5:18
??:"user-zh@flink.apache.org"https://issues.apache.org/jira/browse/FLINK-5601
<https://issues.apache.org/jira/browse/FLINK-5601?gt;

Best,
Congxian


claylin <1012539...@qq.comgt; ??2020??6??23?? 2:44??

gt; 
??watermark??
gt;
gt;
gt;
gt;
gt; 
--amp;nbsp;amp;nbsp;--
gt; ??:amp;nbsp;"Congxian Qiu"

?????? ??????savepoint????????????????????

2020-06-23 文章 claylin
??savepoint??
flatmap??jobgraph
??flatmap??Rebalance??




----
??:"Congxian Qiu"https://issues.apache.org/jira/browse/FLINK-5601
<https://issues.apache.org/jira/browse/FLINK-5601?;

Best,
Congxian


claylin <1012539...@qq.com ??2020??6??23?? 2:44??

 
??watermark??




 --nbsp;nbsp;--
 ??:nbsp;"Congxian Qiu"

?????? ??????savepoint????????????????????

2020-06-23 文章 claylin
??watermark??




----
??:"Congxian Qiu"

?????? ??????savepoint????????????????????

2020-06-22 文章 claylin
1. 
savepoint??savepoint
2. ??window??tumble event time window
3. eventtimerecord




----
??:"Congxian Qiu"

??????savepoint????????????????????

2020-06-22 文章 claylin
hi 
all??savepointevent
 time

回复:kafka connector从指定timestamp开始消费

2020-06-11 文章 claylin
目前版本不支持,我看1.11版本支持,其实可以自己修改支持


 
---原始邮件---
发件人: "Kyle Zhang"

?????? Flink SQL ???? nested Json ????

2020-05-25 文章 claylin
??  ??




----
??:"Benchao Li"

?????? Flink SQL ???? nested Json ????

2020-05-25 文章 claylin
??rowevent 
time


create table my_source (
 database varchar,
 maxwell_ts bigint,
 table varchar,
 data row<
 transaction_sn varchar,
 parent_id int,
 user_id int,
 amount int,
 reference_id varchar,
 status int,
 transaction_type int,
 merchant_id int,
 update_time int,
 create_time int
  ts AS CAST(FROM_UNIXTIME(create_time) AS TIMESTAMP(3)), // 

  WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE
 
) with (
 ...
)





----
??:"Benchao Li"

?????? ?????? ????sql????????????idle??????????????????????????????

2020-05-17 文章 claylin
1.10??kafka??2-5M/s




----
??:"LakeShen"https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time

Best,
LakeShen



claylin <1012539...@qq.com ??2020??5??17?? 10:24??

 
??10-15??key??kafka2-5M


 --nbsp;nbsp;--
 ??:nbsp;"??"https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
 gt
 
<https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4gt;;

 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"tison"

?????? ?????? ????sql????????????idle??????????????????????????????

2020-05-17 文章 claylin
??10-15??key??kafka2-5M


----
??:"??"https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
 
 
 
 
 --nbsp;nbsp;--
 ??:nbsp;"tison"

?????? ????sql????????????idle??????????????????????????????

2020-05-17 文章 claylin
https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4




----
??:"tison"

????sql????????????idle??????????????????????????????

2020-05-17 文章 claylin
sqlTableConfigidle 
timesst??flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9state??CREATE
 TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT, appId 
STRING, deviceId STRING, appVer STRING, dnsDur BIGINT, useGlb 
INT, hitCache INT, requestSize DOUBLE, responseSize DOUBLE, 
totalDur BIGINT, url STRING, statusCode INT, prototype STRING, 
netType STRING, traceId STRING, ts AS CAST(FROM_UNIXTIME(happenAt/1000) 
AS TIMESTAMP(3)), WATERMARK FOR ts AS ts - INTERVAL '20' SECOND )with ( 
'connector.type' = 'kafka', 'connector.version' = 'universal', 
'connector.topic' = 'yapm_metrics', 'connector.properties.zookeeper.connect' = 
'localhost:2181', 'connector.properties.bootstrap.servers' = 
'kafkawx007-core001.yy.com:8101,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101',
 'connector.properties.group.id' = 'interface_success_rate_consumer', 
'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); create 
table request_latency_tbl ( app_id string, app_ver string, net_type 
string, prototype string, url string, status_code int, w_start 
string, success_cnt BIGINT, failure_cnt BIGINT, total_cnt BIGINT ) 
with( 'connector.type' = 'jdbc', 'connector.url' = 
'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=truecharacterEncoding=utf-8zeroDateTimeBehavior=convertToNullautoReconnect=true',
 'connector.table' = 'request_latency_statistics', 'connector.username' = 
'yapm_metrics', 'connector.password' = '1234456', 
'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval' = 
'5s', 'connector.write.max-retries' = '2' ); create view 
request_1minutes_latency  as select appId, appVer, netType, prototype, url, 
statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start, count(distinct 
traceId) filter (where statusCode in (200)) as successCnt, count(distinct 
traceId) filter (where statusCode not in (200)) as failureCnt, 
count(distinct traceId) as total_cnt from yy_yapmnetwork_original group by 
appId, appVer, netType, prototype, url, statusCode, DATE_FORMAT(ts, '-MM-dd 
HH:mm'); insert into request_latency_tbl select * from  
request_1minutes_latency;

flink sql ????time window join ????

2020-04-03 文章 claylin
hi all??flink sqlwindow join

flink sql ????time window join ????

2020-04-03 文章 claylin
hi all??flink sqlwindow join

????Java????????????????

2020-03-03 文章 claylin
hi all??Java??


https://s2.ax1x.com/2020/03/03/34yyvT.png



https://s2.ax1x.com/2020/03/03/34y5P1.png
??

?????? ??????????????????????????????????????rocksDB

2020-02-26 文章 claylin
1.10.0??
??tps2rocksdb??44Gtps??rocksdb??io??rocksdb??rocksdb:low??rocksdb:high??2128??




----
??:"Yu Li"https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#memory-management
[2] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide


On Wed, 26 Feb 2020 at 23:27, claylin <1012539...@qq.com wrote:

 Hi 
rocksDB??tps??10w??rocksDBsar
 -dp ??io??
 Average:nbsp; nbsp; nbsp; nbsp; nbsp; 
DEVnbsp; nbsp; nbsp;
 nbsp;tpsnbsp; rd_sec/snbsp; wr_sec/snbsp; 
avgrq-sznbsp; avgqu-sznbsp;
 nbsp; nbsp;awaitnbsp; nbsp; nbsp;svctmnbsp; 
nbsp; nbsp;%util
 Average:nbsp; nbsp; nbsp; nbsp; nbsp; 
sdanbsp; nbsp; 285.36nbsp;
 nbsp;2152.91nbsp; 88322.99nbsp; nbsp; 317.06nbsp; 
nbsp;
 nbsp;21.48nbsp; nbsp; nbsp;75.27nbsp; nbsp; 
nbsp; 0.58nbsp; nbsp;
 nbsp;16.60
 ??
 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdbManager
 
Memory??rocksDB??flush??compact??state


 ??

?????? ??????????????????????????????????????rocksDB

2020-02-26 文章 claylin
??1.10


/data/hadoop/tmp/nm-local-dir/usercache/flink-user/appcache/application_1582163794765_0026/flink-io-21b91014-03a5-4d58-bafc-7e5b73f11ed0??2??43G??44G??


jstack??rocksdbrocksdb??rocksdb:low??rocksdb:high??CPU1tpsio80??rocksdb??128??


??yarn??node??32128G5??32??5




----
??:"Yun Tang"https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-metrics-total-sst-files-size




____
From: claylin <1012539...@qq.com
Sent: Wednesday, February 26, 2020 23:27
To: user-zh https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdbManager
 
Memory??rocksDB??flush??compact??state


??

??????????????????????????????????????rocksDB

2020-02-26 文章 claylin
Hi 
rocksDB??tps??10w??rocksDBsar
 -dp ??io??
Average: DEV   
tps rd_sec/s wr_sec/s avgrq-sz avgqu-sz 
 await  svctm  %util
Average: sda  285.36 
2152.91 88322.99  317.06  21.48 
 75.27   0.58  16.60
??https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#tuning-rocksdbManager
 
Memory??rocksDB??flush??compact??state


??

????mq????????????????????????????????

2020-02-20 文章 claylin
Hi event 
timekafkatcp(??tcpRecv-Q)
org.apache.kafka.clients.FetchSessionHandler - [Consumer 
clientId=consumer-108, 
groupId=push-life-cycle-trace-consumer-for-flink-1.1.0-no-checkpoint] Error 
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 213: 
org.apache.kafka.common.errors.DisconnectException.




?????? ????????????taskmanager.memory.network????

2020-02-13 文章 claylin





----
??:"Xintong Song"

????????????taskmanager.memory.network????

2020-02-13 文章 claylin
taskmanager.memory.networkyarn??yarn
 
sessionflink-conf.yaml??yarn-session

??checkpoint????????????

2020-01-16 文章 claylin
1.9.1checkpointsst??
2020-01-16 19:29:39
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:881)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:395)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_8ea7af242b2bcc2d11daf69b5d588c4d_(31/32) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 more
Caused by: java.nio.file.NoSuchFileException: 
/data/hadoop/tmp/nm-local-dir/usercache/www-data/appcache/application_1579002711906_0001/flink-io-bd910c0d-03c7-48ff-8712-4e7059bac574/job_bbe797c8fcdf7c362bed774435ae5f86_op_KeyedProcessOperator_8ea7af242b2bcc2d11daf69b5d588c4d__31_32__uuid_7259cf96-aa16-423e-a356-dcac0a7859f2/db/19.sst
 - 
/data/hadoop/tmp/nm-local-dir/usercache/www-data/appcache/application_1579002711906_0001/flink-io-bd910c0d-03c7-48ff-8712-4e7059bac574/job_bbe797c8fcdf7c362bed774435ae5f86_op_KeyedProcessOperator_8ea7af242b2bcc2d11daf69b5d588c4d__31_32__uuid_7259cf96-aa16-423e-a356-dcac0a7859f2/40e6dc65-7fac-41ae-b736-91c4ecd5e296/19.sst
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at 
sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
at java.nio.file.Files.createLink(Files.java:1086)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 12 more



dfscheckpoint

?????? ??????savepoint???????????? migration for MapState currently isn't supported.

2019-11-19 文章 claylin
1.9.1??
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible.



----
??:"shuwen zhou"https://issues.apache.org/jira/browse/FLINK-11947
 Best,
 Congxian


 claylin <1012539...@qq.comgt; ??2019??11??14?? 9:35??

 gt; 
??savepoint??1.8.1??
 gt; java.lang.RuntimeException: Error while getting 
statenbsp;nbsp; at
 gt;
 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
 gt;nbsp; at
 gt;
 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
 gt;nbsp;nbsp;nbsp;nbsp;nbsp; at
 gt;
 
com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; 
at
 gt;
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 gt; at
 gt;
 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
 gt;nbsp;nbsp; at
 gt;
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
 gt;nbsp;nbsp;nbsp;nbsp; at
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 gt;nbsp; at java.lang.Thread.run(Thread.java:748) Caused by:
 gt; org.apache.flink.util.StateMigrationException: The new serializer 
for
 a
 gt; MapState requires state migration in order for the job to proceed.
 However,
 gt; migration for MapState currently isn't
 supported.nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
 gt;nbsp;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
 gt;nbsp; at
 gt;
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
 gt;nbsp; at
 gt;
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
 gt; at
 gt;
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
 gt;nbsp;nbsp;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
 gt;nbsp; at
 gt;
 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
 gt;nbsp;nbsp;nbsp; at
 gt;
 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
 gt;nbsp; at
 gt;
 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
 gt;nbsp; ... 9 more



-- 
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/;

????????Queryable State ????????????????

2019-11-14 文章 claylin


?????? ??????savepoint???????????? migration for MapState currently isn't supported.

2019-11-14 文章 claylin
schema??




----
??:"Congxian Qiu"https://issues.apache.org/jira/browse/FLINK-11947
Best,
Congxian


claylin <1012539...@qq.com ??2019??11??14?? 9:35??

 
??savepoint??1.8.1??
 java.lang.RuntimeException: Error while getting state at
 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
 at
 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
 at
 
com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
 at
 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at
 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
 at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
 at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
 at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 at java.lang.Thread.run(Thread.java:748) Caused by:
 org.apache.flink.util.StateMigrationException: The new serializer for a
 MapState requires state migration in order for the job to proceed. However,
 migration for MapState currently isn't 
supported. at
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
 at
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
 at
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
 at
 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
 at
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
 at
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
 at
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
 at
 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
 at
 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
 at
 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
 at
 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
 at
 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
 ... 9 more

??????savepoint???????????? migration for MapState currently isn't supported.

2019-11-14 文章 claylin
??savepoint??1.8.1??
java.lang.RuntimeException: Error while getting state   at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
   at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
   at 
com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) 
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.flink.util.StateMigrationException: The new serializer for a 
MapState requires state migration in order for the job to proceed. However, 
migration for MapState currently isn't supported.   at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
  at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
 at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
   at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
 at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
   at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
  at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
 at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
   at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
   at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
 at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
   at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
   ... 9 more

????????RocksDBStateBackend ????state.backend.rocksdb.ttl.compaction.filter.enabled ??????????

2019-10-11 文章 claylin
??RocksDBStateBackend??,state??(),??state.backend.rocksdb.ttl.compaction.filter.enabled,rocksdbcompact??ttl,??state,https://github.com/facebook/rocksdb/wiki/Time-to-Live,??,rocksdbcompact,??compact??,state??flinkttlOnCreateAndWrite??OnReadAndWrite??,,??ttl??1??,??state??ttl,??

????????org.apache.flink.streaming.api.operators.TimerHeapInternalTimer ???????????????? ??????????????????

2019-09-25 文章 claylin
,:

 StreamQueryConfig queryConfig = tabEnv.queryConfig();
queryConfig.withIdleStateRetentionTime(Time.seconds(20), 
Time.minutes(6));


DataStream source = env.socketTextStream("localhost", 10028)
.map(new MapFunction() {
@Override
public Student map(String value) throws Exception {
String[] vals = value.split(",");
if (vals.length < 2) {
return null;
}
Student st = new Student();
st.stNo = vals[0];
st.name = vals[1];
return st;
}
}).returns(Student.class);


Table table = tabEnv.fromDataStream(source, "stNo, name");


Table distinctTab = table.groupBy("stNo, name").select("stNo, 
name");//.select("name, name.count as cnt");


DataStream> distinctStream = 
tabEnv.toRetractStream(distinctTab, Student.class);


DataStream distintOutStrem = distinctStream.map(tuple2 -> {
if (tuple2.f0) {
return tuple2.f1;
}
return null;
}).filter(Objects::nonNull);


Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, 
proctime.proctime");


Table result = 
after.window(Tumble.over("10.seconds").on("proctime").as("w"))
.groupBy("name, w")
.select("name, name.count as cnt, w.start as wStart, w.end as 
wEnd, w.proctime as wProctime");


DataStream resultStream = tabEnv.toAppendStream(result, 
Result.class);
resultStream.print();
env.execute(TestState.class.getSimpleName());



??,jvm??,dumporg.apache.flink.streaming.api.operators.TimerHeapInternalTimer
 
,TimerHeapInternalTimer,??
 num #instances #bytes  class name
--
   1:  5937   44249552  [B
   2:214238   18291832  [C
   3:1411995647960  
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
   4:2135215124504  java.lang.String
   5:1187274397272  [Ljava.lang.Object;
   6:1081383460416  java.util.HashMap$Node
   7: 194401667688  [Ljava.util.HashMap$Node;
   8: 942531508048  org.apache.flink.types.Row
   9: 470661506112  
org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
  10: 129241426104  java.lang.Class
  11:491229592  
[Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
  12: 480721153728  java.lang.Long
  13: 346571109024  java.util.concurrent.ConcurrentHashMap$Node
  14:  77721078360  [I
  15: 265911063640  java.util.LinkedHashMap$Entry
  16: 15301 856856  java.util.LinkedHashMap
  17: 11771 847512  java.lang.reflect.Field
  18: 13172 843008  java.nio.DirectByteBuffer
  19:  8570 754160  java.lang.reflect.Method
  20:20 655680  [Lscala.concurrent.forkjoin.ForkJoinTask;
  21: 13402 643296  java.util.HashMap
  22: 12945 621360  
org.apache.flink.core.memory.HybridMemorySegment
  23: 13275 531000  sun.misc.Cleaner
  24: 15840 506880  com.esotericsoftware.kryo.Registration
  25:   393 450928  [Ljava.nio.ByteBuffer;
  26: 13166 421312  java.nio.DirectByteBuffer$Deallocator
  27: 25852 413632  java.lang.Object
  28: 14137 339288  java.util.ArrayList
  29:  6410 307680  
org.apache.kafka.common.metrics.stats.SampledStat$Sample
  30:  4572 292608  
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
  31:   392 288576  
[Ljava.util.concurrent.ConcurrentHashMap$Node;
  32:  8412 269184  org.apache.kafka.common.MetricName
  33:  8412 269184  org.apache.kafka.common.metrics.KafkaMetric
  34:72 268704  
[Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
  35: 10070 241680  
org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion
  36:  9828 225040  [Ljava.lang.Class;
  37:  9360 224640  
com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry
  38:  7905 189720  org.apache.flink.api.java.tuple.Tuple2
  39:  2358 150912  org.apache.kafka.common.metrics.Sensor
  40:  1855 148400  java.lang.reflect.Constructor
  41:  1464 143936  [J