建议关闭state.backend.incremental ,因为我遇到过,开启增量ckp导致ckp一直增大,关掉就正常了
--
Sent from: http://apache-flink.147419.n8.nabble.com/
掐头去尾的提问,完全不知道是什么问题,没法回答你,最好是贴出代码,贴出图片等大家才能帮忙分析
--
Sent from: http://apache-flink.147419.n8.nabble.com/
一般是 Job A出现背压了,checkpoint的时候是要等背压的数据都处理完了才会处理barrier。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
那会一直增大下去吗,我跑了4天,ckp一直变大,没有稳定的迹象。是不是我需要调整compaction的配置
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了
--
Sent from: http://apache-flink.147419.n8.nabble.com/
关闭 增量checkpoint
--
Sent from: http://apache-flink.147419.n8.nabble.com/
使用cdc的方式读取
mysql的更新数据,使用connect流的方式连接两个流,这样可以避免掉watermark才能触发计算的问题。建议在事实表join不到数据的时候
进行缓存用 onTime进行触发。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
json格式改debezium-json试试
--
Sent from: http://apache-flink.147419.n8.nabble.com/
不需要提供全部字段
--
Sent from: http://apache-flink.147419.n8.nabble.com/
tableEnv.executeSql(DDLSourceSQLManager.cdcCreateTableSQL("order_info"));
tableEnv
.toRetractStream(tableEnv.from("order_info"), Row.class)
.filter((FilterFunction>)
booleanRowTuple2 -> booleanRowTuple2.f0)
.map((MapFunction, Row>)
看源码是在
catch里面的(应该是在executeCheckpointing的时候报错了,但是catch里面还有一个nullpoint没catch导致程序退出):
if (LOG.isDebugEnabled()) {
LOG.debug("{} - did NOT finish
synchronous part of checkpoint {}. " +
"Alignment duration:
这部分没人报bug,所以社区一般是不会去验证的。所以你要给出你的代码,同时对问题描述尽量清晰,否则很难有人去自己写代码验证问题。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
tableEnv.explainSql("");
--
Sent from: http://apache-flink.147419.n8.nabble.com/
中间有错误数据或者其他错误原因,背压不会导致数据丢失
--
Sent from: http://apache-flink.147419.n8.nabble.com/
查看发现
org.apache.avro
avro-maven-plugin
${avro.version}
generate-sources
你试过吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
没解决,我只能把它关闭了
--
Sent from: http://apache-flink.147419.n8.nabble.com/
理论上只要实现了LookupTableSource。你在 TableFunction 里面怎么重写 eval 都可以,不管你是要读取哪里的数据怎么读。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark
--
Sent from: http://apache-flink.147419.n8.nabble.com/
toAppendDataStream试试看
--
Sent from: http://apache-flink.147419.n8.nabble.com/
问题一: Interval Join doesn't support consuming update and delete
changes是因为输入A或者B是一个更新流
问题二:interval
join使用buffer来缓存A和B的数据,没有放在state里,只有在watermark超过下边界会触发onEventtime清理 buffer。
延迟问题:没有类似statettl的配置,interval join不受statettl控制清除状态
乱序问题:如果 B的数据时间小于 watermark则匹配不到,一切是跟watermmark相关
以上个人看源码理解的。希望有帮助
--
可能需要修改源码:
https://blog.csdn.net/young_0609/article/details/110407781
--
Sent from: http://apache-flink.147419.n8.nabble.com/
但是用process-time是有数据的,目前用partition-time一直没成功写出过数据
--
Sent from: http://apache-flink.147419.n8.nabble.com/
这不是一个问题,是我理解的问题。感谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
1: Interval Join doesn't support consuming update and delete changes
是因为A或B是一个update stream
2:
Interval Join 的临时数据是放在buffer中,当wtm超过边界时间就会清理掉 buffer也就join不到了。所以
statettl无法控制A流的缓存数据。
延迟问题:所以如果wtm不更新,A流的数据不会被清理因为不受statettl控制
乱序问题:如果B流的旧时间小于 watermark就join不上
以上是个人理解、、
--
Sent from:
有人知道这个bug吗
--
Sent from: http://apache-flink.147419.n8.nabble.com/
1.12默认是 eventtime不需要设置
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好,
1:我设置的时候就是 使用的 partition-time 同时
设定checkpoint间隔为60s。但是我发现watermark一直没有生成或者更新,导致我的数据一直无法commit。想知道
为什么watermark无法生成。当时使用process-time是没问题的。
2:因为写hive的话会有小文件的问题。所以我使用file sink来设置合并文件和控制文件大小。但是写文件是无法写hive
metastore。所以hive查不出数据。
想知道有什么方法解决hive小文件问题,难道只能T+1做小文件合并吗。
--
Sent from:
flink run -yD yarn.application.queue=x
--
Sent from: http://apache-flink.147419.n8.nabble.com/
重要: 在流模式下使用 FileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint
被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。
在官方文档找到了这个,必须要有checkpoint才行,但是我 手动savepoint之后,虽然有sucess文件,但是没有数据
--
Sent from: http://apache-flink.147419.n8.nabble.com/
查看hdfs文件:
分区一直是这样的一个文件,没有生成 _SUCCESS文件
.part-40a2c94d-0437-4666-8d43-31c908aaa02e-0-0.inprogress.73dcc10b-44f4-47e3-abac-0c14bd59f9c9
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好,这个问题已经解决了。
我现在通过官方例子:
SET table.sql-dialect=default;
create table flink_kafka(
sys_time bigint,
rt AS TO_TIMESTAMP(FROM_UNIXTIME(sys_time / 1000, '-MM-dd HH:mm:ss')),
WATERMARK FOR rt AS rt - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'xx',
当配置好HiveCatalog后,
SQL-Cli 也可以查到hive库表信息
创建kafka表:
create table test.test_kafka(
word VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'xx',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'xx',
'properties.group.id' = 'test',
'format' = 'json',
GroupWindowAggregate不支持update或者delete的datasource。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你不定义watermark好像用的就是pro time
--
Sent from: http://apache-flink.147419.n8.nabble.com/
StatementSet inserts = tableEnv.createStatementSet();
inserts.addInsertSql("insert into xxx select * from xxx") // topic1
-》topic2任务
inserts.addInsertSql("insert into xxx select * from xxx") // topic2
-》Postgre 任务
inserts.execute();
--
Sent from: http://apache-flink.147419.n8.nabble.com/
1:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大
我10分钟一次checkpoint,每次都增大2M,两天增大到400M,但其实我的实际应该只有20M(只做一个窗口计算)(我做savepoint之后也才20M)。
已设置了 ttl。
2:当我关闭state.backend.incremental 后 。每次checkpoint也就20M左右,不会变大了。
按我的理解:state.backend.incremental 开启后,Checkpointed Data
通过 createTemporarySystemFunction 试试看呢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
把格式调整下,很乱看不明白
--
Sent from: http://apache-flink.147419.n8.nabble.com/
试了 1.12.2,还是一样问题。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
但是看情况好像是只有在:DataStream发生Keyby或者 setParallelism的时候才会发生
--
Sent from: http://apache-flink.147419.n8.nabble.com/
1.12.1
--
Sent from: http://apache-flink.147419.n8.nabble.com/
再试了一下:
修改并行度也不行
.setParallelism(9)
--
Sent from: http://apache-flink.147419.n8.nabble.com/
经过再一次验证:
即使我做group by rowtime的操作,
我对datastream做keyby(rowtime) 也有这个问题
例如:
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test "));
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
.keyby(_.f1)
https://github.com/zhp8341/flink-streaming-platform-web
这个你可以参考下
--
Sent from: http://apache-flink.147419.n8.nabble.com/
1:当DataStream是由 一个table 经过 group by rowtime 转换过来的就无法触发窗口
例如:
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test group by msg,rowtime"));
// 获得 DataStream,并定义wtm生成
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
定义一个 sourcetable
--
Sent from: http://apache-flink.147419.n8.nabble.com/
为什么4次是没问题的,感觉只执行一次才是最优的
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你这代码贴的乱七八糟。。。
你需要再richjoinfunction里面设置valuestate的生命周期,他不随着窗口而销毁,窗口只会销毁自己设定的state,有空你可以看看window的源码,里面有清理state的逻辑
--
Sent from: http://apache-flink.147419.n8.nabble.com/
currentMaxTimestamp 只是当前数据流里面最大,但不一定是全部的最大。
当数据出现延迟,或者多流的情况下,lastEmittedWatermark 不一定会比 currentMaxTimestamp 小
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我做了。。
添加了一个sql语法类似
"select " +
"msg," +
"count(1) cnt" +
" from test" +
" where msg = 'hello' " +
" group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " +
" EMIT \n" +
" WITH
我认为这可能是一个bug (当然也可能是故意这样设计的):
在 EvictingWindowOperator.emitWindowContents()位置:
userFunction.process(triggerContext.key, triggerContext.window,
processContext, projectedContents, timestampedCollector);
当timestampedCollector的size = 0时;
执行到 ReduceApplyWindowFunction部分:
public void apply(K k, W
代码如下: evictor设置的在窗口触发前清理所有数据,按理进入sum是没有数据,但是调试的时候发现,sum经过计算会输出 null 进入
print,导致报 Nullpoint。不知道是bug还是我的问题;
class A {
String word;
Long time;
public A(String word, Long time) {
this.word = word;
this.time = time;
}
代码如下:
stream
.keyBy((KeySelector, String>) o -> o.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(100)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(2)))
.evictor(TimeEvictor.of(Time.seconds(0)))
.sum(1)
.print();
当 数据在窗口计算前被全部清除时,sum结果会是一个null,会传入print,导致 nullpoint
--
Sent
当程序使用
evictor(TimeEvictor.of(Time.seconds(0)))
来清除 窗口触发前数据时,当数据全部被清除了,在print时会报Null point
Caused by: java.lang.NullPointerException
at
org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(PrintSinkOutputWriter.java:73)
at
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
同一段代码,在main里面可以正常正常,在Test里面却直接结束
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
Pattern.begin("start")
.where(new
SimpleCondition() {
@Override
public boolean filter(KafkaTopicOffsetTimeMsg
kafkaTopicOffsetTimeMsg) throws Exception {
return
是的,需要上传certificate文件,1.12好像没有上传文件的配置
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你说的是es的 xpack 认证吗,需要你载入certificate文件是吗
--
Sent from: http://apache-flink.147419.n8.nabble.com/
当我在使用 StreamTableEnvironment Api的时候;
Table a = getStreamTable(getKafkaDataStream("test", "localhost:9092",
"latest"),"topic,offset,msg");
tableEnv.createTemporaryView("test", a);
tableEnv.executeSql(DDLSourceSQLManager.createCustomPrintlnRetractSinkTbl("printlnSink_retract"));
61 matches
Mail list logo