我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我们使用Nifi将数据采集到kafka,然后使用Flink处理kafka中的数据,现在Nifi如果将多条数据当做一条记录(写入kafka中的数据格式为csv或者json)写入kafka后(就是一个offset中包含了多条数据),Flink只会处理其中的一条数据?有没有什么办法让Flink都处理一个offset中的多条数据?
yinghua...@163.com
问题已解决
需要在FLink home的lib中引入kafka connector jar包
-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/
感谢大佬的回复!以后优先现在邮箱这边讨论发现问题再去提个issue!
我的 idleStateRetention确实是设置3600秒,我先进行测试看看。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink sql cdc 采集mysql binlog 可以保留before,after的字段吗?
按照官方的例子,定义表结构后,是最新的字段值?
能否同时保留before和after?
已经解决了,去掉循环,把每个kafka topic单独处理,再union
在 2021-06-01 08:54:42,"13631283359" <13631283...@163.com> 写道:
大家好,
我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多
代码如下:
/**
* 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param
Hi,
先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。
目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:
1. 单次checkpoint文件数目过多,JM单点删除跟不上相关速度
2. 整体checkpoint
Thank you for your reply!
您所说的kafka connector 是*flink-connector-kafka_2.11*
这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了* flink-sql-connector-kafka_2.11*依赖了。
我试了引入* flink-connector-kafka_2.11*,但还是会报错的。
-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Thank you for your reply!
您所说的kafka connector 是* flink-connector-kafka_2.11*
这个依赖吗?这个是Datastream所用的依赖,我在pom中已经引入了 *flink-sql-connector-kafka_2.11*依赖了。
我试了引入* flink-connector-kafka_2.11*,但还是会报错的。
-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/
没有更好的方式吗,这样治标不治本,那我大状态任务会有很多问题
--
Sent from: http://apache-flink.147419.n8.nabble.com/
没有更好的方式吗
--
Sent from: http://apache-flink.147419.n8.nabble.com/
关闭 增量checkpoint
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi Jacob,
Maybe you miss the kafka connector dependency in your pom,
you could refer to this url :
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
Best,
LakeShen
Jacob <17691150...@163.com> 于2021年6月1日周二 上午9:54写道:
> Dear All,
>
> 我在使用Flink
Dear All,
我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下
其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。
搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下:
请指教
*Java Code*
TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS
有没有大佬帮忙看看
--
Sent from: http://apache-flink.147419.n8.nabble.com/
大家好,
我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多
代码如下:
/**
* 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env
Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics:
List[String], env:
大家好,
我使用 Flink 1.10.1 并尝试使用 Flink State Processor API 从Savepoint读取 flink state 状态。
当状态Type 是普通 Java type或 Java POJOs时, 运行良好。
当 Avro 生成的 Java class 用作状态类型 state type时,不工作。
在这种Avro class情况下是否需要额外的序列化 serializers?
谢谢
谭民
或许你可以参考这个:
[image: image.png]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/
Best,
LakeShen
chenchencc <1353637...@qq.com> 于2021年5月28日周五 下午4:30写道:
> 想问下state ttl能针对单表设置吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
在增量 checkpoint 下,你可以简单理解状态几乎都存在 checkpoint 目录中的 shared 目录,
所以即使清理 checkpoint,也只是先将这次 checkpoint 引用的相关文件句柄的引用数减1,
只有一个文件没有 checkpoint 引用它时,才会真正删除该文件。
Best,
LakeShen.
刘建刚 于2021年5月28日周五 下午7:03写道:
> 增量快照的原理是sst文件共享,系统会自动帮助你管理sst文件的引用,类似java的引用,并不会因为一个快照删除了就会把实际的数据删除掉。
> 也就不会发生你说的情况
>
> tison
Hi,或许 Flink SQL interval join 能够满足你的需求。
Best,
LakeShen.
Shuo Cheng 于2021年5月31日周一 下午12:10写道:
> state ttl 只能是全局算子维度, table.exec.state.ttl
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
看下你的 Flink 版本是多少,如果是高版本的话,社区有提供 DataStream 的 HBase Sink。
Best,
LakeShen.
Zorro 于2021年5月31日周一 下午2:41写道:
> 由于你的DDL是变化的,无法提前预知所有字段,所以首先可以确定的是这个场景无法使用Flink SQL解决。
>
> 如果使用DataStream解决的话是可行的,唯一可能存在的问题就是目前社区没有提供DataStream的HBase sink。
> 如果你需要在DataStream中使用HBase sink的话,可能需要你自定义一个HBase sink或者基于社区的HBase
大家好:
最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
ERROR org.apache.flink.runtime.blob.BlobServerConnection -Error while
excuting Blob connection
.
.
.
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
:Adjusted frame length exceeds 10485760: 1347375960 -discarded
我也遇到了 请问你解决了没
--
Sent from: http://apache-flink.147419.n8.nabble.com/
版本flink 1.11.2
EnvironmentSettings build =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(build);
HA在ZK里面记录了最后一次成功的checkpoint counter和地址,没有启用HA的话,就是从指定的savepoint恢复的。
Best,
Yang
刘建刚 于2021年5月28日周五 下午6:51写道:
> 那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。
>
> 董建 <62...@163.com> 于2021年5月28日周五 下午6:24写道:
>
> > 稳定复现
> > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> > 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> >
你可以describe一下失败JM的pod发出来,看看生成的启动命令是不是正确的
Best,
Yang
fz 于2021年5月28日周五 下午10:09写道:
> 镜像: flink:1.13.0-scala_2.11
>
> sed: cannot rename /opt/flink/conf/sed1yRdDY: Device or resource busy
> sed: cannot rename /opt/flink/conf/sed03zP3W: Device or resource busy
> /docker-entrypoint.sh: line 73:
一、环境:
1、版本:1.12.0
2、flink sql
3、已经设置了setIdleStateRetention 为1小时
4、状态后端是rocksDB, 增量模式
5、源数据没有数据激增情况,任务已经跑了两天
二、详情
具体sql见第三大点,就是普通的group by统计的
sql,然后设置setIdleStateRetention(3600)。目前观察两天了,checkpoint目录下面的shared文件夹的大小一直在增长,然后看文件夹里的文件是在一直更新,最早的文件也会消失。
由于你的DDL是变化的,无法提前预知所有字段,所以首先可以确定的是这个场景无法使用Flink SQL解决。
如果使用DataStream解决的话是可行的,唯一可能存在的问题就是目前社区没有提供DataStream的HBase sink。
如果你需要在DataStream中使用HBase sink的话,可能需要你自定义一个HBase sink或者基于社区的HBase SQL
connector做一些更改。不过这些更改是需要在Java代码层面的。
至于其他的处理逻辑可以用pyFlink很方便的改写。
--
Sent from:
history server??
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/deployment/advanced/historyserver/
----
??:
29 matches
Mail list logo