----
??:
"user-zh"
https://issues.apache.org/jira/browse/FLINK-19830
<https://issues.apache.org/jira/browse/FLINK-19830gt;
Leonard
gt; ?? 2021??6??2317:03??op <520075...@
??
----
??:
"user-zh"
https://issues.apache.org/jira/browse/FLINK-19830
<https://issues.apache.org/jira/browse/FLINK-19830;
Leonard
?? 2021??6??2317:03??op <520075...@qq.com.INVALID ??
hi??kakatemporal join??
org.apache.flink.table.api.TableException: Processing-time temporal join is not
supported yet.
sql??
create view visioned_table as
select
user_id,
event
from
(select
user_id,
event,
flink sql 1.12 minibatch??
val config = tConfig.getConfiguration()
config.setString("table.exec.mini-batch.enabled", "true") // mini-batch is
enabled
config.setString("table.exec.mini-batch.allow-latency", "true")
config.setString("table.exec.mini-batch.size", 100)
??upsert-kafkasinkkeypartition??keyA??B??kafka,
??upsert-kafka??key??A??B??A
----
??:
??upsert-kafka??key
----
??:
"user-zh"
?? upsert-kafka connector
source??key??
flink??api??
1.connectkeyedstream??key join??
2.coprocessfunction ?? keyedcoprocessfunction
??flinksqlcount (distinct??state??
flink1.11??Streaming File
Sinkhdfsexactly-once
----
??:
"user-zh"
??
----
??:
"user-zh"
??RestartStrategiesRestart??
----
??:
"user-zh"
FlinkKafkaConsumerKafkaConsumer??flinkkafka
----
??:
"user-zh"
hi, FlinkKafkaConsumer
//---
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
Env.setRestartStrategy(RestartStrategies.noRestart())
val consumerProps = new Properties()
hi, i am confused about consumer group of
FlinkKafkaConsumer, i have two applications,with the same
code like this:
//---
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
Env.setRestartStrategy(RestartStrategies.noRestart())
val consumerProps = new
hi
grouby count(*)??
----
??:
"user-zh"
??minIdleStateRetentionTime ??
val tConfig = tableEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.minutes(5), Time.minutes(10))
1.11.0??sql??sessionid groupby
count(*)??sessionid1
Hi
??flink sql??
val config = tableConfig.getConfiguration()
config.setString("table.exec.mini-batch.enabled", "true")
config.setString("table.exec.mini-batch.allow-latency", "5s")
config.setString("table.exec.mini-batch.size", "20")
er-zh"
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
Best,
Congxian
op <520075...@qq.com ??2020??8??5?? 4:03??
??ttl??
val settings =
EnvironmentSetting
er-zh"
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
Best,
Congxian
op <520075...@qq.com ??2020??8??5?? 4:03??
??ttl??
er-zh"
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
Best,
Congxian
op <520075...@qq.comgt; ??2020??8??3?? 2:18??
gt; amp;nbsp; amp;nbsp;
gt;
1.11.
er-zh"
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
Best,
Congxian
op <520075...@qq.com ??2020??8??3?? 2:18??
1.11.0hdfscheckpoint??checkpoint3??
?? day ?? id groupby
7watermark??
tConfig.setIdleStateRetentionTime(Time.minutes(1440),
statement.execute??streamEnv.execute
app??
----
??:
??StatementSet ??2??insertsqlapplication??
??sink
??
?? 1.10??connector type
----
??:
"user-zh"
??sql??kafka??1.11??datastream
Exception in thread "main" org.apache.flink.table.api.TableException: Table
sink 'default_catalog.default_database.mvp_rtdwb_user_business' doesn't support
consuming update changes which
----
??:
"user-zh"
??
yarn??,sql-client??1.11.0
----
??:
"user-zh"
habse??family1
INSERT INTO hTable SELECT rowkey, ROW(null,f1q1) FROM T;
??jdbc
CREATE TABLE mvp_dim_anticheat_args_all (
id BIGINT,
dt STRING,
cnt_7d INT,
cnt_30d INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'driver'='com.mysql.jdbc.Driver',
'url' = 'jdbc:mysql://localhost:3306/huyou_oi',
'table-name' =
??
??bloomfilter
.keyBy(_._1).process(new KeyedProcessFunction[String,(String,String),String]()
{
var state:ValueState[BloomFilter[CharSequence]]= null
override def open(parameters: Configuration): Unit = {
val stateDesc = new
kafka tablescan.startup.modeCREATE TABLE kafkaTable (
user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts
TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id'
quot;user-zh"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
amp;gt; Best,
amp;gt; Congxian
amp;gt;
amp;gt;
amp;gt; op <520075...@qq.comamp;amp;gt; ??2020??7??8??
3:53??
amp;gt;
amp;gt; amp;amp;g
??
----
??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
Best,
Congxian
op <520075...@qq.comgt; ??2020??7??8?? 3:
gxian Qiu"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
Best,
Congxian
op <520075...@qq.com ??2020??7??8?? 3:53??
??Cache??
--nbsp
??Cache??
----
??:"Congxian Qiu"
ValueState[Cache]??value
map??cacheputupdatestate??cache??1
tebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.;
?? 2020??7??717:01??op <520075...@qq.com ??
hi??
nbsp; flink sql ??kafka??key
kafka connectorkey??
nbsp;
hi??
flink sql ??kafka??key
kafka connectorkey??
??sql?? select day,
count(id),
sum(v1) from
(
select
day ,
id ,
sum(v1) v1 from source
group by day,
id
)t
group by day
tConfig.setIdleStateRetentionTime(Time.minutes(1440),Time.minutes(1450))
hi
flink1.10??wen i want to sink data to hbase table like this??
bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
rowkey String,
info ROW
??Blinkplanner??oldplanner??1.10
package test.table.sql
import java.util.Properties
import com.souhu.msns.huyou.PublicParams
import com.souhu.msns.huyou.utils.KafkaPbSchema
import org.apache.flink.api.common.time.Time
import
??oldPlannerIdleStateRetentionTime??join??blinkplannerbug
hi??
..
val tConfig =
bstEnv.getConfigconfg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))..val
q1=bstEnv.sqlQuery(
"""select createTime,feedid from source
|where circleName is not null
|and circleName not in('','_')
|and action =
??:
1.?? hbase connector ?? hbase 1.4.3
2.??1.2.0??connector
??
48 matches
Mail list logo