?????? Processing-time temporal join is not supported yet

2021-06-23 文章 op
watermarkeventtimewatermarkkey




----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-19830 
<https://issues.apache.org/jira/browse/FLINK-19830gt; 
 
 
 Leonard
 
 
 
 gt; ?? 2021??6??2317:03??op <520075...@qq.com.INVALIDgt; 
??
 gt; 
 gt;nbsp; Processing-time temporal join is not supported yet.

?????? Processing-time temporal join is not supported yet

2021-06-23 文章 op
??Event time temporal join 
??temporal??key??watermark??




----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-19830 
<https://issues.apache.org/jira/browse/FLINK-19830; 


Leonard



 ?? 2021??6??2317:03??op <520075...@qq.com.INVALID ??
 
 Processing-time temporal join is not supported yet.

Processing-time temporal join is not supported yet

2021-06-23 文章 op
hi??kakatemporal join??
org.apache.flink.table.api.TableException: Processing-time temporal join is not 
supported yet.

sql??


create view visioned_table as
 select
 user_id,
 event
from
(select
  user_id,
  event,
  row_number() over(partition by user_id order by event_time desc) 
as rn
  from kafka_table1
  )ta where rn=1;

select
   t1.*,t2.*
  from mvp_rtdwd_event_app_quit t1
join visioned_table FOR SYSTEM_TIME AS OF t1.proc_time AS t2
 on t1.user_id=t2.user_id
 where t1.user_id is not null

flink sql 1.12 minibatch??????

2021-04-28 文章 op
  flink sql 1.12 minibatch??
val config = tConfig.getConfiguration()
config.setString("table.exec.mini-batch.enabled", "true") //  mini-batch is 
enabled
config.setString("table.exec.mini-batch.allow-latency", "true") 
config.setString("table.exec.mini-batch.size", 100) 
config.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable 
two-phase, i.e. local-global aggregation
config.setString("table.optimizer.distinct-agg.split.enabled", "true") //not 
support user defined AggregateFunctionsql??tableEnv.executeSql(
  s"""insert into event_test
 |select
 |  date_format(create_time,'MMdd') dt,
 |  uid,
 |  count(distinct fid) text_feed_count,
 |  max(event_time) event_time
 |from basic_fd_base where ftype <'0' and uid is not null
 |  group by
 |   date_format(create_time,'MMdd'),
 |   uid
""".stripMargin).print()event_test print connectorbasic_fd_base kafka 
connectortimestampeventtimenullnull??2
 +I(20210427,747873111868904192,1,2021-04-27T14:03:10)
2 +I(20210427,709531067945685120,1,null)
2 +I(20210427,759213633292150016,1,2021-04-27T13:59:01.923)
2 +I(20210427,758340406550406272,4,2021-04-27T14:02:14.553)
2 +I(20210427,759658063329437312,1,2021-04-27T14:02:18.305)
2 +I(20210427,737415823706231680,1,2021-04-27T14:02:11.061)
2 +I(20210427,xishuashu...@sohu.com,1,2021-04-27T14:05:37)
2 +I(20210427,759219266892539008,1,null)
2 +I(20210427,758349976605763328,1,2021-04-27T14:02:24.184)
2 -U(20210427,709531067945685120,1,null)
2 +U(20210427,709531067945685120,1,2021-04-27T14:09:27.156)
2 +I(20210427,751664239562922752,1,2021-04-27T14:16:51.133)
2 -U(20210427,759219266892539008,1,null)
2 +U(20210427,759219266892539008,1,2021-04-27T14:12:40.692)
2 +I(20210427,745540385069273984,1,2021-04-27T14:23:34)
2 +I(20210427,745399833011098240,1,2021-04-27T14:20:32.870)
2 +I(20210427,714590484395398016,1,2021-04-27T14:19:06)
2 +I(20210427,747859173236216832,1,2021-04-27T14:28:21.864)
2 +I(20210427,746212052309316608,1,null)
2 +I(20210427,666839205279797376,1,2021-04-27T14:26:36.743)
2 +I(20210427,758334362541565568,3,2021-04-27T14:18:58.396)
2 +I(20210427,758325137706788480,1,2021-04-27T14:01:09.053)
2 +I(20210427,747837209624908800,1,2021-04-27T13:59:44.193)
2 -U(20210427,758388594254750720,1,2021-04-27T14:00:44.212)
2 +U(20210427,758388594254750720,4,2021-04-27T14:14:55)
2 +I(20210427,75946621079296,1,2021-04-27T14:25:59.019)
2 -U(20210427,762769243539450496,1,2021-04-27T14:04:29)
2 +U(20210427,762769243539450496,2,2021-04-27T14:04:29)
2 +I(20210427,720648040456852096,1,2021-04-27T14:19:38.680)
2 +I(20210427,750144041584368000,1,2021-04-27T14:29:25.621)
2 +I(20210427,713108045701517952,1,null)
??minibatchnull??

?????? ????upsert-kafka connector??????

2021-04-23 文章 op
??upsert-kafkasinkkeypartition??keyA??B??kafka,
??upsert-kafka??key??A??B??A




----
??: 
   "user-zh"



?????? ????upsert-kafka connector??????

2021-04-22 文章 op
??upsert-kafka??key




----
??: 
   "user-zh"



????upsert-kafka connector??????

2021-04-22 文章 op
?? upsert-kafka connector 
source??key??


flink api??????

2021-04-07 文章 op
flink??api??
1.connectkeyedstream??key join??
2.coprocessfunction ?? keyedcoprocessfunction 


flink sql ?? count(distinct )????

2021-03-16 文章 op
??flinksqlcount (distinct??state??

flink1.11??Streaming File Sink????

2021-02-22 文章 op

  flink1.11??Streaming File 
Sinkhdfsexactly-once

?????? ??????????????????????????????????

2021-02-03 文章 op





----
??: 
   "user-zh"



?????? ??????????????????????????????????

2021-02-03 文章 op
??




----
??: 
   "user-zh"



??????????????????????????????????

2021-02-03 文章 op

 
??RestartStrategiesRestart??

?????? FlinkKafkaConsumer????

2020-09-04 文章 op
 




----
??: 
   "user-zh"



?????? FlinkKafkaConsumer????

2020-09-03 文章 op
FlinkKafkaConsumerKafkaConsumer??flinkkafka


----
??: 
   "user-zh"



FlinkKafkaConsumer????

2020-09-03 文章 op
  hi,  FlinkKafkaConsumer 
 
//---
 val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
 Env.setRestartStrategy(RestartStrategies.noRestart())
 val consumerProps = new Properties()
 consumerProps.put("bootstrap.servers", brokers)
 consumerProps.put("group.id", "test1234")

 val consumer = new FlinkKafkaConsumer[String](topic,new 
KafkaStringSchema,consumerProps).setStartFromLatest()
 Env.addSource(consumer).print()
 
Env.execute()//---??topic??group.idtopickafka
 ??consumer 
groupKafkaConsumer??topicflink1.11flink-connector-kafka_2.11
 

?????? ?????? flink sql????????????

2020-08-10 文章 op
hi
grouby count(*)??




----
??: 
   "user-zh"



?????? flink sql????????????

2020-08-09 文章 op
??minIdleStateRetentionTime ??
val tConfig = tableEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.minutes(5), Time.minutes(10)) 
1.11.0??sql??sessionid groupby 
count(*)??sessionid1
minibatch


----
??: 
   "user-zh"



flink sql????????????

2020-08-09 文章 op
Hi
  ??flink sql??
 val config = tableConfig.getConfiguration()
 config.setString("table.exec.mini-batch.enabled", "true") 
 config.setString("table.exec.mini-batch.allow-latency", "5s")
 config.setString("table.exec.mini-batch.size", "20")  
FsStateBackendRocksDBStateBackend??checkpoint
  
??checkpoint

?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-06 文章 op

1.10??




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
 
  Best,
  Congxian
 
 
  op <520075...@qq.com ??2020??8??5?? 4:03??
 
   ??ttl??
   val settings =
  EnvironmentSettings.newInstance().inStreamingMode().build()
   val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
   val tConfig = tableEnv.getConfig
   tConfig.setIdleStateRetentionTime(Time.minutes(1440),
 Time.minutes(1450))
  
  
   nbsp; nbsp; 1)3??
   nbsp; nbsp; 2)RocksDB
   
   --nbsp;nbsp;--
   ??:
  

 "user-zh"
  

 <
   qcx978132...@gmail.comgt;;
   :nbsp;2020??8??5??(??) 3:30
   ??:nbsp;"user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
   gt
   <
 
 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7gt
  ;
   Best,
   gt; Congxian
   gt;
   gt;
   gt; op <520075...@qq.comamp;gt; ??2020??8??3?? 
2:18??
   gt;
   gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp;
   gt; amp;gt;
   
1.11.0hdfscheckpoint??checkpoint3??
   gt; amp;gt; ?? day ?? id groupby
   gt; amp;gt; 
7watermark??
   gt; amp;gt; 
tConfig.setIdleStateRetentionTime(Time.minutes(1440),
   gt; amp;gt; Time.minutes(1440+10))
   gt; amp;gt;
   gt; amp;gt;
   gt; amp;gt;
   gt; amp;gt;
   gt; amp;gt;
   
--amp;amp;nbsp;amp;amp;nbsp;--
   gt; amp;gt; ??:
   gt;
  
 
 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
   gt; amp;nbsp; "user-zh"
   gt;
  
 
 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
   gt; amp;nbsp; <
   gt; amp;gt; 384939...@qq.comamp;amp;gt;;
   gt; amp;gt; 
:amp;amp;nbsp;2020??8??3??(??) 1:50
   gt; amp;gt; 
??:amp;amp;nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;amp;gt;
   gt; amp;gt;
   gt; amp;gt; 18??
   gt; amp;gt; <
   
http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;amp;gt;
   gt; amp;gt;
   gt; amp;gt; checkpoints??
   gt; amp;gt; <
   
http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;amp;gt
 ;
   gt; amp;gt;
   gt; amp;gt; hdfs??
   gt; amp;gt; <
   
http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;amp;gt
 ;
   gt; amp;gt;
   gt; amp;gt; ??
   gt; amp;gt; <
   gt;
  
 
 
http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gt
   gt
   <
 
 
http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gtgt
  ;
   ;
   gt; amp;gt;
   gt; amp;gt;
   gt; amp;gt; Congxian Qiu wrote
   gt; amp;gt; amp;amp;gt; 
Hiamp;amp;nbsp;amp;amp;nbsp; ??
   gt; amp;gt;
   
amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
    checkpoint
   gt;  checkpoint
   gt; amp;gt; size ??
   gt; amp;gt; amp;amp;gt; checkpoint  
hdfs ?? ls  checkpoint
  
   gt; amp;gt;
   
amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
   gt; 
??state 
??
   gt; amp;gt; amp;amp;gt;
   gt; amp;gt; amp;amp;gt; Best,
   gt; amp;gt; amp;amp;gt; Congxian
   gt; amp;gt; amp;amp;gt;
   gt; amp;gt; amp;amp;gt;
   gt; amp;gt; amp;amp;gt; ?? <

?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-05 文章 op
  ?? 
 ?? RocksDB StateBackend



----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend

Best,
Congxian


op <520075...@qq.com ??2020??8??5?? 4:03??

 ??ttl??
 val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
 val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
 val tConfig = tableEnv.getConfig
 tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))


 nbsp; nbsp; 1)3??
 nbsp; nbsp; 2)RocksDB
 
 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 qcx978132...@gmail.comgt;;
 :nbsp;2020??8??5??(??) 3:30
 ??:nbsp;"user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
 gt
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7gt;;
 Best,
 gt; Congxian
 gt;
 gt;
 gt; op <520075...@qq.comamp;gt; ??2020??8??3?? 2:18??
 gt;
 gt; amp;gt; amp;amp;nbsp; amp;amp;nbsp;
 gt; amp;gt;
 
1.11.0hdfscheckpoint??checkpoint3??
 gt; amp;gt; ?? day ?? id groupby
 gt; amp;gt; 
7watermark??
 gt; amp;gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
 gt; amp;gt; Time.minutes(1440+10))
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 
--amp;amp;nbsp;amp;amp;nbsp;--
 gt; amp;gt; ??:
 gt;
 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; amp;nbsp; "user-zh"
 gt;
 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; amp;nbsp; <
 gt; amp;gt; 384939...@qq.comamp;amp;gt;;
 gt; amp;gt; :amp;amp;nbsp;2020??8??3??(??) 
1:50
 gt; amp;gt; 
??:amp;amp;nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;amp;gt;
 gt; amp;gt;
 gt; amp;gt; 18??
 gt; amp;gt; <
 http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;amp;gt;
 gt; amp;gt;
 gt; amp;gt; checkpoints??
 gt; amp;gt; <
 http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;amp;gt;
 gt; amp;gt;
 gt; amp;gt; hdfs??
 gt; amp;gt; <
 http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;amp;gt;
 gt; amp;gt;
 gt; amp;gt; ??
 gt; amp;gt; <
 gt;
 
http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gt
 gt
 
<http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;amp;gtgt;;
 ;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt; Congxian Qiu wrote
 gt; amp;gt; amp;amp;gt; 
Hiamp;amp;nbsp;amp;amp;nbsp; ??
 gt; amp;gt;
 
amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
  checkpoint
 gt;  checkpoint
 gt; amp;gt; size ??
 gt; amp;gt; amp;amp;gt; checkpoint  hdfs ?? ls 
 checkpoint 
 gt; amp;gt;
 
amp;amp;gt;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
 gt; ??state 
??
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; Best,
 gt; amp;gt; amp;amp;gt; Congxian
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; ?? <
 gt; amp;gt;
 gt; amp;gt; amp;amp;gt; 384939718@
 gt; amp;gt;
 gt; amp;gt; amp;amp;gt;amp;amp;gt; ??2020??7??30?? 
10:43??
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt;amp;am

?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-05 文章 op
??ttl??
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
val tConfig = tableEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))


  1)3??
  2)RocksDB

----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
 Best,
 Congxian


 op <520075...@qq.comgt; ??2020??8??3?? 2:18??

 gt; amp;nbsp; amp;nbsp;
 gt; 
1.11.0hdfscheckpoint??checkpoint3??
 gt; ?? day ?? id groupby
 gt; 
7watermark??
 gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
 gt; Time.minutes(1440+10))
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 nbsp; "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 nbsp; <
 gt; 384939...@qq.comamp;gt;;
 gt; :amp;nbsp;2020??8??3??(??) 1:50
 gt; 
??:amp;nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pngamp;gt;
 gt;
 gt; 18??
 gt; 
<http://apache-flink.147419.n8.nabble.com/file/t793/9.pngamp;gt;
 gt;
 gt; checkpoints??
 gt; 
<http://apache-flink.147419.n8.nabble.com/file/t793/conf.pngamp;gt;
 gt;
 gt; hdfs??
 gt; 
<http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pngamp;gt;
 gt;
 gt; ??
 gt; <
 
http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pngamp;gt
 ;
 gt;
 gt;
 gt; Congxian Qiu wrote
 gt; amp;gt; Hiamp;nbsp;amp;nbsp; ??
 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; 
 checkpoint
  checkpoint
 gt; size ??
 gt; amp;gt; checkpoint  hdfs ?? ls  checkpoint 

 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 ??state 
??
 gt; amp;gt;
 gt; amp;gt; Best,
 gt; amp;gt; Congxian
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt; ?? <
 gt;
 gt; amp;gt; 384939718@
 gt;
 gt; amp;gt;amp;gt; ??2020??7??30?? 10:43??
 gt; amp;gt;
 gt; amp;gt;amp;gt; ??
 gt; amp;gt;amp;gt;
 gt; amp;gt;amp;gt; 
flink1.11.1??backend20??
 gt; amp;gt;amp;gt; 
??
 gt; amp;gt;amp;gt; StateBackend backend =new
 gt; amp;gt;amp;gt;
 gt; amp;gt;amp;gt;
 gt;
 
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
 gt; amp;gt;amp;gt; StateBackend backend =new
 gt; amp;gt;amp;gt;
 gt; amp;gt;amp;gt;
 gt;
 
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
 gt; amp;gt;amp;gt;
 gt; amp;gt;amp;gt;
 gt; amp;gt;amp;gt; 
RocksDBStateBackend??
 gt; amp;gt;amp;gt; RocksDBStateBackend??
 gt; amp;gt;amp;gt; amp;amp;lt;
 gt; 
http://apache-flink.147419.n8.nabble.com/file/t793/444.pngamp;amp;gt
 ;
 gt; amp;gt;amp;gt; FsStateBackend??
 gt; amp;gt;amp;gt; amp;amp;lt;
 gt; 
http://apache-flink.147419.n8.nabble.com/file/t793/555.pngamp;amp;gt
 ;
 gt; amp;gt;amp;gt;
 gt; amp;gt;amp;gt;
 gt; amp;gt;amp;gt;
 gt; amp;gt;amp;gt;
 gt; amp;gt;amp;gt; --
 gt; amp;gt;amp;gt; Sent from: 
http://apache-flink.147419.n8.nabble.com/
 gt <http://apache-flink.147419.n8.nabble.com/gt;; 
amp;gt;amp;gt;
 gt;
 gt;
 gt;
 gt;
 gt;
 gt; --
 gt; Sent from: http://apache-flink.147419.n8.nabble.com/

?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-05 文章 op
  
FsStateBackend??5checkpoint??300ms
 
??1440minute??5
 checkpoint shared group 
by??key??
 5




--  --
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
Best,
Congxian


op <520075...@qq.com ??2020??8??3?? 2:18??

 nbsp; nbsp;
 
1.11.0hdfscheckpoint??checkpoint3??
 ?? day ?? id groupby
 
7watermark??
 tConfig.setIdleStateRetentionTime(Time.minutes(1440),
 Time.minutes(1440+10))




 --nbsp;nbsp;--
 ??:

  "user-zh"

  <
 384939...@qq.comgt;;
 :nbsp;2020??8??3??(??) 1:50
 ??:nbsp;"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.pnggt;

 18??
 <http://apache-flink.147419.n8.nabble.com/file/t793/9.pnggt;

 checkpoints??
 <http://apache-flink.147419.n8.nabble.com/file/t793/conf.pnggt;

 hdfs??
 <http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.pnggt;

 ??
 
<http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.pnggt;


 Congxian Qiu wrote
 gt; Hinbsp;nbsp; ??
 gt;nbsp;nbsp;nbsp;nbsp;  
checkpoint  checkpoint
 size ??
 gt; checkpoint  hdfs ?? ls  checkpoint 
 gt;nbsp;nbsp;nbsp;nbsp; 
??state 
??
 gt;
 gt; Best,
 gt; Congxian
 gt;
 gt;
 gt; ?? <

 gt; 384939718@

 gt;gt; ??2020??7??30?? 10:43??
 gt;
 gt;gt; ??
 gt;gt;
 gt;gt; 
flink1.11.1??backend20??
 gt;gt; 
??
 gt;gt; StateBackend backend =new
 gt;gt;
 gt;gt;
 
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
 gt;gt; StateBackend backend =new
 gt;gt;
 gt;gt;
 
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
 gt;gt;
 gt;gt;
 gt;gt; 
RocksDBStateBackend??
 gt;gt; RocksDBStateBackend??
 gt;gt; amp;lt;
 http://apache-flink.147419.n8.nabble.com/file/t793/444.pngamp;gt;
 gt;gt; FsStateBackend??
 gt;gt; amp;lt;
 http://apache-flink.147419.n8.nabble.com/file/t793/555.pngamp;gt;
 gt;gt;
 gt;gt;
 gt;gt;
 gt;gt;
 gt;gt; --
 gt;gt; Sent from: http://apache-flink.147419.n8.nabble.com/
 gt;gt;





 --
 Sent from: http://apache-flink.147419.n8.nabble.com/

?????? flink1.10.1/1.11.1 ????sql ????group ?? ???????? ?????? ????????????

2020-08-03 文章 op
  
1.11.0hdfscheckpoint??checkpoint3??
?? day ?? id groupby 
7watermark??
tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1440+10))




----
??: 
   "user-zh"

<384939...@qq.com;
:2020??8??3??(??) 1:50
??:"user-zh"http://apache-flink.147419.n8.nabble.com/file/t793/6.png; 

18??


?????? StatementSet ??????????insertsql????

2020-07-30 文章 op
statement.execute??streamEnv.execute
app??



----
??: 
   "user-zh"



StatementSet ??????????insertsql????

2020-07-30 文章 op
??StatementSet ??2??insertsqlapplication??
??sink
??


?????? Sql??kafka????????????????

2020-07-29 文章 op
?? 1.10??connector type


----
??: 
   "user-zh"



Sql??kafka????????????????

2020-07-28 文章 op
??sql??kafka??1.11??datastream





Exception in thread "main" org.apache.flink.table.api.TableException: Table 
sink 'default_catalog.default_database.mvp_rtdwb_user_business' doesn't support 
consuming update changes which is produced by node GroupAggregate(groupBy=[dt, 
user_id], select=[dt, user_id, SUM($f2) AS text_feed_count, SUM($f3) AS 
picture_feed_count, SUM($f4) AS be_comment_forward_user_count, SUM($f5) AS 
share_link_count, SUM($f6) AS share_music_count, SUM($f7) AS share_video_count, 
SUM($f8) AS follow_count, SUM($f9) AS direct_post_count, SUM($f10) AS 
comment_post_count, SUM($f11) AS comment_count, SUM($f12) AS fans_count, 
MAX(event_time) AS event_time])

?????? sql-client ??jdbc??????

2020-07-27 文章 op
 




----
??: 
   "user-zh"



?????? sql-client ??jdbc??????

2020-07-27 文章 op
??
 
yarn??,sql-client??1.11.0
----
??: 
   "user-zh"



Hbase connector????????

2020-07-27 文章 op
 habse??family1 

INSERT INTO hTable SELECT rowkey, ROW(null,f1q1) FROM T;


sql-client ??jdbc??????

2020-07-27 文章 op
??jdbc


CREATE TABLE mvp_dim_anticheat_args_all (
  id BIGINT,
  dt STRING,
  cnt_7d INT,
 cnt_30d INT,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'driver'='com.mysql.jdbc.Driver',
 'url' = 'jdbc:mysql://localhost:3306/huyou_oi',
 'table-name' = 'mvp_dim_ll',
 'username' = 'huy_oi',
 'password' = '420123'
);




[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: com.mysql.jdbc.Driver





lib??flink-connector-jdbc_2.11-1.11.0.jar 
??mysql-connector-java-5.1.38.jar 


flink state????

2020-07-16 文章 op
??
??bloomfilter
 .keyBy(_._1).process(new KeyedProcessFunction[String,(String,String),String]() 
{
  var state:ValueState[BloomFilter[CharSequence]]= null
  override def open(parameters: Configuration): Unit = {
val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new 
TypeHint[BloomFilter[CharSequence]](){}))
state = getRuntimeContext.getState(stateDesc)
  }
  override def processElement(value: (String, String), ctx: 
KeyedProcessFunction[String, (String, String), String]#Context, out: 
Collector[String]) = {

var filter = state.value
if(filter==null){
  println("null filter")
  filter=  
BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,1,0.0001)}
//val contains = filter.mightContain(value._2)
if(!filter.mightContain(value._2)) {
  filter.put(value._2)
  state.update(filter)
  out.collect(value._2)

}

  }
})
??savepoint??state??bloomfilternull??

kafka connector????

2020-07-08 文章 op
kafka tablescan.startup.modeCREATE TABLE kafkaTable ( 
user_id BIGINT,  item_id BIGINT,  category_id BIGINT,  behavior STRING,  ts 
TIMESTAMP(3) ) WITH (  'connector' = 'kafka',  'topic' = 'user_behavior',  
'properties.bootstrap.servers' = 'localhost:9092',  'properties.group.id' = 
'testGroup',  'format' = 'csv',  'scan.startup.mode' = 'earliest-offset' 
)??'earliest-offset','latest-offset','group-offsets','timestamp'and'specific-offsets'??group-offsets??offset??kafka
 broker??savepoint??offset

?????? State??????guava Cache

2020-07-08 文章 op
??
 1??
 roaringbitmap??state
 




----
??:"Yichao Yang"<1048262...@qq.com;
:2020??7??8??(??) 6:45
??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
amp;gt; Best,
amp;gt; Congxian
amp;gt;
amp;gt;
amp;gt; op <520075...@qq.comamp;amp;gt; ??2020??7??8?? 
3:53??
amp;gt;
amp;gt; amp;amp;gt; ??Cache??
amp;gt; amp;amp;gt;
amp;gt; amp;amp;gt;
amp;gt; amp;amp;gt; 
--amp;amp;amp;nbsp;amp;amp;amp;nbsp;--
amp;gt; amp;amp;gt; ??:amp;amp;amp;nbsp;"Congxian 
Qiu"

?????? State??????guava Cache

2020-07-08 文章 op
??idid??state
idValueState[Cache]??id
??state??cache??id??



----
??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
 Best,
 Congxian


 op <520075...@qq.comgt; ??2020??7??8?? 3:53??

 gt; ??Cache??
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"Congxian 
Qiu"

?????? State??????guava Cache

2020-07-08 文章 op
??id??cacheid
keyttlstate ttl??




----
??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
Best,
Congxian


op <520075...@qq.com ??2020??7??8?? 3:53??

 ??Cache??


 --nbsp;nbsp;--
 ??:nbsp;"Congxian Qiu"

?????? State??????guava Cache

2020-07-08 文章 op
??Cache??


----
??:"Congxian Qiu"

State??????guava Cache

2020-07-07 文章 op
 ValueState[Cache]??value 



map??cacheputupdatestate??cache??1

?????? flink sql ??????kafka??????????????????????key??

2020-07-07 文章 op





----
??:"Leonard Xu"https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
 
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.;

 ?? 2020??7??717:01??op <520075...@qq.com ??
 
 hi??
 nbsp; flink sql ??kafka??key
 kafka connectorkey??
 
 nbsp;

flink sql ??????kafka??????????????????????key??

2020-07-07 文章 op
hi??
 flink sql ??kafka??key
kafka connectorkey??



flink sql??????????????????

2020-07-05 文章 op
??sql?? select day,
 count(id),
 sum(v1) from
(
select
 day ,
 id ,
 sum(v1) v1 from source
   group by day,
 id
)t


group by day



tConfig.setIdleStateRetentionTime(Time.minutes(1440),Time.minutes(1450))


??id??14??checkpoint
??1.10.0

?????? BLinkPlanner sql join????????

2020-06-11 文章 op
??Blinkplanner??oldplanner??1.10




package test.table.sql


import java.util.Properties


import com.souhu.msns.huyou.PublicParams
import com.souhu.msns.huyou.utils.KafkaPbSchema
import org.apache.flink.api.common.time.Time
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.windowing.time.{Time = WindowTime}
import org.apache.flink.types.Row




object test {


 def main(args: Array[String]): Unit = {


  
//
  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
  bsEnv.setNumberOfExecutionRetries(1)
  bsEnv.setParallelism(1)
  //bsEnv.getConfig.setAutoWatermarkInterval(1)
  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  bsEnv.setStateBackend(new 
FsStateBackend("hdfs://dc1:8020/user/msns/streaming/checkpoint/flink/Circ", 
true))
  bsEnv.getCheckpointConfig.setCheckpointInterval(30)
  bsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(6)
  bsEnv.setParallelism(3)
  bsEnv.setNumberOfExecutionRetries(1)


  
//TABLE


  val setting = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val bstEnv = StreamTableEnvironment.create(bsEnv,setting)
  val tConfig = bstEnv.getConfig
  
tConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(20))
  val config = bstEnv.getConfig.getConfiguration()
  config.setString("table.exec.mini-batch.enabled", "true") // 
local-global aggregation depends on mini-batch is enabled
  config.setString("table.exec.mini-batch.allow-latency", "5 s")
  config.setString("table.exec.mini-batch.size", "5000")
  config.setString("table.optimizer.agg-phase-strategy", 
"TWO_PHASE") // enable two-phase, i.e. local-global aggregation
  config.setString("table.optimizer.distinct-agg.split.enabled", 
"true")
  //bstEnv.getConfig.setLocalTimeZone(ZoneId.of("Etc/GMT+8"))



  
//??
  val kafkaProps = new Properties()
  kafkaProps.setProperty("bootstrap.servers", PublicParams.brokers)
  val source = 
   
.toTable(bstEnv,'userId,'createTime.rowtime,'action,'circleName,'flowName,'ts,'content,'feedid,'postfeedid,'sessionId)


  bstEnv.createTemporaryView("source",source)


  val q1=bstEnv.sqlQuery(
   """select sessionId from source
|where sessionId is not null
|and action='P_TIMELINE'""".stripMargin)
   q1.toAppendStream[Row].print("source")
  bstEnv.createTemporaryView("sourcefeed",q1)

  val q2=bstEnv.sqlQuery(
   """select sessionId from source
|where sessionId is not null
|and action='V_TIMELINE_FEED'""".stripMargin)
  bstEnv.createTemporaryView("postfeed",q2)

  bstEnv.sqlQuery(
   """
|select count(b.sessionId) from
|sourcefeed a
|join postfeed b
|on a.sessionId=b.sessionId
   """.stripMargin).toRetractStream[Row].print("")




  bstEnv.execute("")
 }
}









----
??:"Leonard Xu"

BLinkPlanner sql join????????

2020-06-11 文章 op

??oldPlannerIdleStateRetentionTime??join??blinkplannerbug

Flink sql ????????????

2020-06-10 文章 op
hi??



..
val tConfig = 
bstEnv.getConfigconfg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))..val
 q1=bstEnv.sqlQuery(
  """select createTime,feedid from source
|where circleName is not null
|and circleName not in('','_')
|and action = 'C_FEED_EDIT_SEND'
|""".stripMargin)
 bstEnv.createTemporaryView("sourcefeed",q1)
val q2=bstEnv.sqlQuery(
  """select feedid,postfeedid,action from source
|where circleName is not null
|and circleName not in('','_')
|and action in('C_PUBLISH','C_FORWARD_PUBLISH')
|""".stripMargin)

bstEnv.createTemporaryView("postfeed",q2)
bstEnv.sqlQuery(
  """
|select count(b.postfeedid) from
|sourcefeed a
|join postfeed b
|on a.feedid=b.postfeedid
  """.stripMargin).toRetractStream[Row](confg).print("")
//25webid1??join??state

flink sql??hbase connector????

2020-05-29 文章 op
??:


  1.?? hbase connector ?? hbase 1.4.3
  2.??1.2.0??connector


??