?????? Processing-time temporal join is not supported yet
watermarkeventtimewatermarkkey ---- ??: "user-zh" https://issues.apache.org/jira/browse/FLINK-19830 <https://issues.apache.org/jira/browse/FLINK-19830gt; Leonard gt; ?? 2021??6??2317:03??op <520075...@qq.com.INVALIDgt; ?? gt; gt;nbsp; Processing-time temporal join is not supported yet.
?????? Processing-time temporal join is not supported yet
??Event time temporal join ??temporal??key??watermark?? ---- ??: "user-zh" https://issues.apache.org/jira/browse/FLINK-19830 <https://issues.apache.org/jira/browse/FLINK-19830; Leonard ?? 2021??6??2317:03??op <520075...@qq.com.INVALID ?? Processing-time temporal join is not supported yet.
Processing-time temporal join is not supported yet
hi??kakatemporal join?? org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet. sql?? create view visioned_table as select user_id, event from (select user_id, event, row_number() over(partition by user_id order by event_time desc) as rn from kafka_table1 )ta where rn=1; select t1.*,t2.* from mvp_rtdwd_event_app_quit t1 join visioned_table FOR SYSTEM_TIME AS OF t1.proc_time AS t2 on t1.user_id=t2.user_id where t1.user_id is not null
flink sql 1.12 minibatch??????
flink sql 1.12 minibatch?? val config = tConfig.getConfiguration() config.setString("table.exec.mini-batch.enabled", "true") // mini-batch is enabled config.setString("table.exec.mini-batch.allow-latency", "true") config.setString("table.exec.mini-batch.size", 100) config.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation config.setString("table.optimizer.distinct-agg.split.enabled", "true") //not support user defined AggregateFunctionsql??tableEnv.executeSql( s"""insert into event_test |select | date_format(create_time,'MMdd') dt, | uid, | count(distinct fid) text_feed_count, | max(event_time) event_time |from basic_fd_base where ftype <'0' and uid is not null | group by | date_format(create_time,'MMdd'), | uid """.stripMargin).print()event_test print connectorbasic_fd_base kafka connectortimestampeventtimenullnull??2 +I(20210427,747873111868904192,1,2021-04-27T14:03:10) 2 +I(20210427,709531067945685120,1,null) 2 +I(20210427,759213633292150016,1,2021-04-27T13:59:01.923) 2 +I(20210427,758340406550406272,4,2021-04-27T14:02:14.553) 2 +I(20210427,759658063329437312,1,2021-04-27T14:02:18.305) 2 +I(20210427,737415823706231680,1,2021-04-27T14:02:11.061) 2 +I(20210427,xishuashu...@sohu.com,1,2021-04-27T14:05:37) 2 +I(20210427,759219266892539008,1,null) 2 +I(20210427,758349976605763328,1,2021-04-27T14:02:24.184) 2 -U(20210427,709531067945685120,1,null) 2 +U(20210427,709531067945685120,1,2021-04-27T14:09:27.156) 2 +I(20210427,751664239562922752,1,2021-04-27T14:16:51.133) 2 -U(20210427,759219266892539008,1,null) 2 +U(20210427,759219266892539008,1,2021-04-27T14:12:40.692) 2 +I(20210427,745540385069273984,1,2021-04-27T14:23:34) 2 +I(20210427,745399833011098240,1,2021-04-27T14:20:32.870) 2 +I(20210427,714590484395398016,1,2021-04-27T14:19:06) 2 +I(20210427,747859173236216832,1,2021-04-27T14:28:21.864) 2 +I(20210427,746212052309316608,1,null) 2 +I(20210427,666839205279797376,1,2021-04-27T14:26:36.743) 2 +I(20210427,758334362541565568,3,2021-04-27T14:18:58.396) 2 +I(20210427,758325137706788480,1,2021-04-27T14:01:09.053) 2 +I(20210427,747837209624908800,1,2021-04-27T13:59:44.193) 2 -U(20210427,758388594254750720,1,2021-04-27T14:00:44.212) 2 +U(20210427,758388594254750720,4,2021-04-27T14:14:55) 2 +I(20210427,75946621079296,1,2021-04-27T14:25:59.019) 2 -U(20210427,762769243539450496,1,2021-04-27T14:04:29) 2 +U(20210427,762769243539450496,2,2021-04-27T14:04:29) 2 +I(20210427,720648040456852096,1,2021-04-27T14:19:38.680) 2 +I(20210427,750144041584368000,1,2021-04-27T14:29:25.621) 2 +I(20210427,713108045701517952,1,null) ??minibatchnull??
?????? ????upsert-kafka connector??????
??upsert-kafkasinkkeypartition??keyA??B??kafka, ??upsert-kafka??key??A??B??A ---- ??: "user-zh"
?????? ????upsert-kafka connector??????
??upsert-kafka??key ---- ??: "user-zh"
????upsert-kafka connector??????
?? upsert-kafka connector source??key??
flink api??????
flink??api?? 1.connectkeyedstream??key join?? 2.coprocessfunction ?? keyedcoprocessfunction
flink sql ?? count(distinct )????
??flinksqlcount (distinct??state??
flink1.11??Streaming File Sink????
flink1.11??Streaming File Sinkhdfsexactly-once
?????? ??????????????????????????????????
---- ??: "user-zh"
?????? ??????????????????????????????????
?? ---- ??: "user-zh"
??????????????????????????????????
??RestartStrategiesRestart??
?????? FlinkKafkaConsumer????
---- ??: "user-zh"
?????? FlinkKafkaConsumer????
FlinkKafkaConsumerKafkaConsumer??flinkkafka ---- ??: "user-zh"
FlinkKafkaConsumer????
hi, FlinkKafkaConsumer //--- val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment Env.setRestartStrategy(RestartStrategies.noRestart()) val consumerProps = new Properties() consumerProps.put("bootstrap.servers", brokers) consumerProps.put("group.id", "test1234") val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() Env.addSource(consumer).print() Env.execute()//---??topic??group.idtopickafka ??consumer groupKafkaConsumer??topicflink1.11flink-connector-kafka_2.11
?????? ?????? flink sql????????????
hi grouby count(*)?? ---- ??: "user-zh"
?????? flink sql????????????
??minIdleStateRetentionTime ?? val tConfig = tableEnv.getConfig tConfig.setIdleStateRetentionTime(Time.minutes(5), Time.minutes(10)) 1.11.0??sql??sessionid groupby count(*)??sessionid1 minibatch ---- ??: "user-zh"
flink sql????????????
Hi ??flink sql?? val config = tableConfig.getConfiguration() config.setString("table.exec.mini-batch.enabled", "true") config.setString("table.exec.mini-batch.allow-latency", "5s") config.setString("table.exec.mini-batch.size", "20") FsStateBackendRocksDBStateBackend??checkpoint ??checkpoint
?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????
1.10?? ---- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend Best, Congxian op <520075...@qq.com ??2020??8??5?? 4:03?? ??ttl?? val settings = EnvironmentSettings.newInstance().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(bsEnv, settings) val tConfig = tableEnv.getConfig tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450)) nbsp; nbsp; 1)3?? nbsp; nbsp; 2)RocksDB --nbsp;nbsp;-- ??: "user-zh" < qcx978132...@gmail.comgt;; :nbsp;2020??8??5??(??) 3:30 ??:nbsp;"user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7 gt < https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7gt ; Best, gt; Congxian gt; gt; gt; op <520075...@qq.comamp;gt; ??2020??8??3?? 2:18?? gt; gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; gt; amp;gt; 1.11.0hdfscheckpoint??checkpoint3?? gt; amp;gt; ?? day ?? id groupby gt; amp;gt; 7watermark?? gt; amp;gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440), gt; amp;gt; Time.minutes(1440+10)) gt; amp;gt; gt; amp;gt; gt; amp;gt; gt; amp;gt; gt; amp;gt; --amp;amp;nbsp;amp;amp;nbsp;-- gt; amp;gt; ??: gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; amp;nbsp; "user-zh" gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; amp;nbsp; < gt; amp;gt; 384939...@qq.comamp;amp;gt;; gt; amp;gt; :amp;amp;nbsp;2020??8??3??(??) 1:50 gt; amp;gt; ??:amp;amp;nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;amp;gt; gt; amp;gt; gt; amp;gt; 18?? gt; amp;gt; < http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;amp;gt; gt; amp;gt; gt; amp;gt; checkpoints?? gt; amp;gt; < http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;amp;gt ; gt; amp;gt; gt; amp;gt; hdfs?? gt; amp;gt; < http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;amp;gt ; gt; amp;gt; gt; amp;gt; ?? gt; amp;gt; < gt; http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gt gt < http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gtgt ; ; gt; amp;gt; gt; amp;gt; gt; amp;gt; Congxian Qiu wrote gt; amp;gt; amp;amp;gt; Hiamp;amp;nbsp;amp;amp;nbsp; ?? gt; amp;gt; amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp; checkpoint gt; checkpoint gt; amp;gt; size ?? gt; amp;gt; amp;amp;gt; checkpoint hdfs ?? ls checkpoint gt; amp;gt; amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp; gt; ??state ?? gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; Best, gt; amp;gt; amp;amp;gt; Congxian gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; ?? <
?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????
?? ?? RocksDB StateBackend ---- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend Best, Congxian op <520075...@qq.com ??2020??8??5?? 4:03?? ??ttl?? val settings = EnvironmentSettings.newInstance().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(bsEnv, settings) val tConfig = tableEnv.getConfig tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450)) nbsp; nbsp; 1)3?? nbsp; nbsp; 2)RocksDB --nbsp;nbsp;-- ??: "user-zh" < qcx978132...@gmail.comgt;; :nbsp;2020??8??5??(??) 3:30 ??:nbsp;"user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7 gt <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7gt;; Best, gt; Congxian gt; gt; gt; op <520075...@qq.comamp;gt; ??2020??8??3?? 2:18?? gt; gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp; gt; amp;gt; 1.11.0hdfscheckpoint??checkpoint3?? gt; amp;gt; ?? day ?? id groupby gt; amp;gt; 7watermark?? gt; amp;gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440), gt; amp;gt; Time.minutes(1440+10)) gt; amp;gt; gt; amp;gt; gt; amp;gt; gt; amp;gt; gt; amp;gt; --amp;amp;nbsp;amp;amp;nbsp;-- gt; amp;gt; ??: gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; amp;nbsp; "user-zh" gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; amp;nbsp; < gt; amp;gt; 384939...@qq.comamp;amp;gt;; gt; amp;gt; :amp;amp;nbsp;2020??8??3??(??) 1:50 gt; amp;gt; ??:amp;amp;nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;amp;gt; gt; amp;gt; gt; amp;gt; 18?? gt; amp;gt; < http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;amp;gt; gt; amp;gt; gt; amp;gt; checkpoints?? gt; amp;gt; < http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;amp;gt; gt; amp;gt; gt; amp;gt; hdfs?? gt; amp;gt; < http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;amp;gt; gt; amp;gt; gt; amp;gt; ?? gt; amp;gt; < gt; http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gt gt <http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gtgt;; ; gt; amp;gt; gt; amp;gt; gt; amp;gt; Congxian Qiu wrote gt; amp;gt; amp;amp;gt; Hiamp;amp;nbsp;amp;amp;nbsp; ?? gt; amp;gt; amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp; checkpoint gt; checkpoint gt; amp;gt; size ?? gt; amp;gt; amp;amp;gt; checkpoint hdfs ?? ls checkpoint gt; amp;gt; amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp; gt; ??state ?? gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; Best, gt; amp;gt; amp;amp;gt; Congxian gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; ?? < gt; amp;gt; gt; amp;gt; amp;amp;gt; 384939718@ gt; amp;gt; gt; amp;gt; amp;amp;gt;amp;amp;gt; ??2020??7??30?? 10:43?? gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt;amp;am
?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????
??ttl?? val settings = EnvironmentSettings.newInstance().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(bsEnv, settings) val tConfig = tableEnv.getConfig tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450)) 1)3?? 2)RocksDB ---- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7 Best, Congxian op <520075...@qq.comgt; ??2020??8??3?? 2:18?? gt; amp;nbsp; amp;nbsp; gt; 1.11.0hdfscheckpoint??checkpoint3?? gt; ?? day ?? id groupby gt; 7watermark?? gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440), gt; Time.minutes(1440+10)) gt; gt; gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??: gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; nbsp; "user-zh" gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; nbsp; < gt; 384939...@qq.comamp;gt;; gt; :amp;nbsp;2020??8??3??(??) 1:50 gt; ??:amp;nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;gt; gt; gt; 18?? gt; <http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;gt; gt; gt; checkpoints?? gt; <http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;gt; gt; gt; hdfs?? gt; <http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;gt; gt; gt; ?? gt; < http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;gt ; gt; gt; gt; Congxian Qiu wrote gt; amp;gt; Hiamp;nbsp;amp;nbsp; ?? gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; checkpoint checkpoint gt; size ?? gt; amp;gt; checkpoint hdfs ?? ls checkpoint gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; ??state ?? gt; amp;gt; gt; amp;gt; Best, gt; amp;gt; Congxian gt; amp;gt; gt; amp;gt; gt; amp;gt; ?? < gt; gt; amp;gt; 384939718@ gt; gt; amp;gt;amp;gt; ??2020??7??30?? 10:43?? gt; amp;gt; gt; amp;gt;amp;gt; ?? gt; amp;gt;amp;gt; gt; amp;gt;amp;gt; flink1.11.1??backend20?? gt; amp;gt;amp;gt; ?? gt; amp;gt;amp;gt; StateBackend backend =new gt; amp;gt;amp;gt; gt; amp;gt;amp;gt; gt; RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false); gt; amp;gt;amp;gt; StateBackend backend =new gt; amp;gt;amp;gt; gt; amp;gt;amp;gt; gt; FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false); gt; amp;gt;amp;gt; gt; amp;gt;amp;gt; gt; amp;gt;amp;gt; RocksDBStateBackend?? gt; amp;gt;amp;gt; RocksDBStateBackend?? gt; amp;gt;amp;gt; amp;amp;lt; gt; http://apache-flink.147419.n8.nabble.com/file/t793/444.pngamp;amp;gt ; gt; amp;gt;amp;gt; FsStateBackend?? gt; amp;gt;amp;gt; amp;amp;lt; gt; http://apache-flink.147419.n8.nabble.com/file/t793/555.pngamp;amp;gt ; gt; amp;gt;amp;gt; gt; amp;gt;amp;gt; gt; amp;gt;amp;gt; gt; amp;gt;amp;gt; gt; amp;gt;amp;gt; -- gt; amp;gt;amp;gt; Sent from: http://apache-flink.147419.n8.nabble.com/ gt <http://apache-flink.147419.n8.nabble.com/gt;; amp;gt;amp;gt; gt; gt; gt; gt; gt; gt; -- gt; Sent from: http://apache-flink.147419.n8.nabble.com/
?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????
FsStateBackend??5checkpoint??300ms ??1440minute??5 checkpoint shared group by??key?? 5 -- -- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7 Best, Congxian op <520075...@qq.com ??2020??8??3?? 2:18?? nbsp; nbsp; 1.11.0hdfscheckpoint??checkpoint3?? ?? day ?? id groupby 7watermark?? tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1440+10)) --nbsp;nbsp;-- ??: "user-zh" < 384939...@qq.comgt;; :nbsp;2020??8??3??(??) 1:50 ??:nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pnggt; 18?? <http://apache-flink.147419.n8.nabble.com/file/t793/9.pnggt; checkpoints?? <http://apache-flink.147419.n8.nabble.com/file/t793/conf.pnggt; hdfs?? <http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pnggt; ?? <http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pnggt; Congxian Qiu wrote gt; Hinbsp;nbsp; ?? gt;nbsp;nbsp;nbsp;nbsp; checkpoint checkpoint size ?? gt; checkpoint hdfs ?? ls checkpoint gt;nbsp;nbsp;nbsp;nbsp; ??state ?? gt; gt; Best, gt; Congxian gt; gt; gt; ?? < gt; 384939718@ gt;gt; ??2020??7??30?? 10:43?? gt; gt;gt; ?? gt;gt; gt;gt; flink1.11.1??backend20?? gt;gt; ?? gt;gt; StateBackend backend =new gt;gt; gt;gt; RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false); gt;gt; StateBackend backend =new gt;gt; gt;gt; FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false); gt;gt; gt;gt; gt;gt; RocksDBStateBackend?? gt;gt; RocksDBStateBackend?? gt;gt; amp;lt; http://apache-flink.147419.n8.nabble.com/file/t793/444.pngamp;gt; gt;gt; FsStateBackend?? gt;gt; amp;lt; http://apache-flink.147419.n8.nabble.com/file/t793/555.pngamp;gt; gt;gt; gt;gt; gt;gt; gt;gt; gt;gt; -- gt;gt; Sent from: http://apache-flink.147419.n8.nabble.com/ gt;gt; -- Sent from: http://apache-flink.147419.n8.nabble.com/
?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????
1.11.0hdfscheckpoint??checkpoint3?? ?? day ?? id groupby 7watermark?? tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1440+10)) ---- ??: "user-zh" <384939...@qq.com; :2020??8??3??(??) 1:50 ??:"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.png; 18??
?????? StatementSet ??????????insertsql????
statement.execute??streamEnv.execute app?? ---- ??: "user-zh"
StatementSet ??????????insertsql????
??StatementSet ??2??insertsqlapplication?? ??sink ??
?????? Sql??kafka????????????????
?? 1.10??connector type ---- ??: "user-zh"
Sql??kafka????????????????
??sql??kafka??1.11??datastream Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.mvp_rtdwb_user_business' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS follow_count, SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11) AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS event_time])
?????? sql-client ??jdbc??????
---- ??: "user-zh"
?????? sql-client ??jdbc??????
?? yarn??,sql-client??1.11.0 ---- ??: "user-zh"
Hbase connector????????
habse??family1 INSERT INTO hTable SELECT rowkey, ROW(null,f1q1) FROM T;
sql-client ??jdbc??????
??jdbc CREATE TABLE mvp_dim_anticheat_args_all ( id BIGINT, dt STRING, cnt_7d INT, cnt_30d INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'driver'='com.mysql.jdbc.Driver', 'url' = 'jdbc:mysql://localhost:3306/huyou_oi', 'table-name' = 'mvp_dim_ll', 'username' = 'huy_oi', 'password' = '420123' ); [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver lib??flink-connector-jdbc_2.11-1.11.0.jar ??mysql-connector-java-5.1.38.jar
flink state????
?? ??bloomfilter .keyBy(_._1).process(new KeyedProcessFunction[String,(String,String),String]() { var state:ValueState[BloomFilter[CharSequence]]= null override def open(parameters: Configuration): Unit = { val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new TypeHint[BloomFilter[CharSequence]](){})) state = getRuntimeContext.getState(stateDesc) } override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]) = { var filter = state.value if(filter==null){ println("null filter") filter= BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,1,0.0001)} //val contains = filter.mightContain(value._2) if(!filter.mightContain(value._2)) { filter.put(value._2) state.update(filter) out.collect(value._2) } } }) ??savepoint??state??bloomfilternull??
kafka connector????
kafka tablescan.startup.modeCREATE TABLE kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' )??'earliest-offset','latest-offset','group-offsets','timestamp'and'specific-offsets'??group-offsets??offset??kafka broker??savepoint??offset
?????? State??????guava Cache
?? 1?? roaringbitmap??state ---- ??:"Yichao Yang"<1048262...@qq.com; :2020??7??8??(??) 6:45 ??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl amp;gt; Best, amp;gt; Congxian amp;gt; amp;gt; amp;gt; op <520075...@qq.comamp;amp;gt; ??2020??7??8?? 3:53?? amp;gt; amp;gt; amp;amp;gt; ??Cache?? amp;gt; amp;amp;gt; amp;gt; amp;amp;gt; amp;gt; amp;amp;gt; --amp;amp;amp;nbsp;amp;amp;amp;nbsp;-- amp;gt; amp;amp;gt; ??:amp;amp;amp;nbsp;"Congxian Qiu"
?????? State??????guava Cache
??idid??state idValueState[Cache]??id ??state??cache??id?? ---- ??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl Best, Congxian op <520075...@qq.comgt; ??2020??7??8?? 3:53?? gt; ??Cache?? gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??:amp;nbsp;"Congxian Qiu"
?????? State??????guava Cache
??id??cacheid keyttlstate ttl?? ---- ??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl Best, Congxian op <520075...@qq.com ??2020??7??8?? 3:53?? ??Cache?? --nbsp;nbsp;-- ??:nbsp;"Congxian Qiu"
?????? State??????guava Cache
??Cache?? ---- ??:"Congxian Qiu"
State??????guava Cache
ValueState[Cache]??value map??cacheputupdatestate??cache??1
?????? flink sql ??????kafka??????????????????????key??
---- ??:"Leonard Xu"https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell. <https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.; ?? 2020??7??717:01??op <520075...@qq.com ?? hi?? nbsp; flink sql ??kafka??key kafka connectorkey?? nbsp;
flink sql ??????kafka??????????????????????key??
hi?? flink sql ??kafka??key kafka connectorkey??
flink sql??????????????????
??sql?? select day, count(id), sum(v1) from ( select day , id , sum(v1) v1 from source group by day, id )t group by day tConfig.setIdleStateRetentionTime(Time.minutes(1440),Time.minutes(1450)) ??id??14??checkpoint ??1.10.0
?????? BLinkPlanner sql join????????
??Blinkplanner??oldplanner??1.10 package test.table.sql import java.util.Properties import com.souhu.msns.huyou.PublicParams import com.souhu.msns.huyou.utils.KafkaPbSchema import org.apache.flink.api.common.time.Time import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala._ import org.apache.flink.streaming.api.windowing.time.{Time = WindowTime} import org.apache.flink.types.Row object test { def main(args: Array[String]): Unit = { // val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment bsEnv.setNumberOfExecutionRetries(1) bsEnv.setParallelism(1) //bsEnv.getConfig.setAutoWatermarkInterval(1) bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) bsEnv.setStateBackend(new FsStateBackend("hdfs://dc1:8020/user/msns/streaming/checkpoint/flink/Circ", true)) bsEnv.getCheckpointConfig.setCheckpointInterval(30) bsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(6) bsEnv.setParallelism(3) bsEnv.setNumberOfExecutionRetries(1) //TABLE val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bstEnv = StreamTableEnvironment.create(bsEnv,setting) val tConfig = bstEnv.getConfig tConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(20)) val config = bstEnv.getConfig.getConfiguration() config.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled config.setString("table.exec.mini-batch.allow-latency", "5 s") config.setString("table.exec.mini-batch.size", "5000") config.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation config.setString("table.optimizer.distinct-agg.split.enabled", "true") //bstEnv.getConfig.setLocalTimeZone(ZoneId.of("Etc/GMT+8")) //?? val kafkaProps = new Properties() kafkaProps.setProperty("bootstrap.servers", PublicParams.brokers) val source = .toTable(bstEnv,'userId,'createTime.rowtime,'action,'circleName,'flowName,'ts,'content,'feedid,'postfeedid,'sessionId) bstEnv.createTemporaryView("source",source) val q1=bstEnv.sqlQuery( """select sessionId from source |where sessionId is not null |and action='P_TIMELINE'""".stripMargin) q1.toAppendStream[Row].print("source") bstEnv.createTemporaryView("sourcefeed",q1) val q2=bstEnv.sqlQuery( """select sessionId from source |where sessionId is not null |and action='V_TIMELINE_FEED'""".stripMargin) bstEnv.createTemporaryView("postfeed",q2) bstEnv.sqlQuery( """ |select count(b.sessionId) from |sourcefeed a |join postfeed b |on a.sessionId=b.sessionId """.stripMargin).toRetractStream[Row].print("") bstEnv.execute("") } } ---- ??:"Leonard Xu"
BLinkPlanner sql join????????
??oldPlannerIdleStateRetentionTime??join??blinkplannerbug
Flink sql ????????????
hi?? .. val tConfig = bstEnv.getConfigconfg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))..val q1=bstEnv.sqlQuery( """select createTime,feedid from source |where circleName is not null |and circleName not in('','_') |and action = 'C_FEED_EDIT_SEND' |""".stripMargin) bstEnv.createTemporaryView("sourcefeed",q1) val q2=bstEnv.sqlQuery( """select feedid,postfeedid,action from source |where circleName is not null |and circleName not in('','_') |and action in('C_PUBLISH','C_FORWARD_PUBLISH') |""".stripMargin) bstEnv.createTemporaryView("postfeed",q2) bstEnv.sqlQuery( """ |select count(b.postfeedid) from |sourcefeed a |join postfeed b |on a.feedid=b.postfeedid """.stripMargin).toRetractStream[Row](confg).print("") //25webid1??join??state
flink sql??hbase connector????
??: 1.?? hbase connector ?? hbase 1.4.3 2.??1.2.0??connector ??