Flink PB序列化疑惑

2020-05-11 文章 chanamper
大家好,请教一下Flink序列化相关的问题。 DataStream windowCounts = env .addSource() .transform(); 我flink任务程序中的对象为Record,Record下的Head字段为PB协议的数据,对Head类注册了PB序列化方式。 env.getConfig().registerTypeWithKryoSerializer( Head.class, ProtobufSerializer.class);

回复:Flink keyby数据倾斜问题

2020-04-06 文章 chanamper
多谢,我试试哈! | | 陈建平 | | 邮箱:chanam...@163.com | 签名由 网易邮箱大师 定制 在2020年04月05日 22:18,戴嘉诚 写道: 你好 可以参考一下这个链接的思路 https://blog.csdn.net/IT_Lee_J_H/article/details/88641894 发自我的iPhone > 在 2020年4月4日,18:15,chanamper 写道: > > Dear All, >大家好,请教一下。目前针对Java Api的方式,对于Flink keyby情况存在key数

Flink keyby数据倾斜问题

2020-04-04 文章 chanamper
Dear All, 大家好,请教一下。目前针对Java Api的方式,对于Flink keyby情况存在key数据倾斜有啥实现优化思路吗?看官方文档目前在table api和sql层面,有Minibatch Aggregation和Local Global Aggregation方式的实现,针对Java Api的方式有啥办法可以达到local global aggregation的效果吗? 多谢!

Flink 1.10 Local Aggregate问题

2020-03-29 文章 chanamper
Dear All, 请教一下,Flink 1.10版本中Java Api如何实现Local Aggregate的功能呢?数据存在较大的倾斜,想在keyby前进行一次local aggregate, 看了下在https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink有计划实现localKeyBy()方法, flink 1.10 java api中还没发现对应的localKeyBy方法。请问下,目前Java

Flink 1.10 Java API local Aggregate问题

2020-03-29 文章 chanamper
Dear All,

flink rocksdb反压问题的处理

2020-02-25 文章 chanamper
hi all, 我采用flink 1.10版本、rocksdb的状态后端,任务运行一段时间后出现反压的情况,导致任务的处理性能急剧下降,想请教一下有啥解决rocksdb状态后端下反压情况的优化手段吗?多谢

Flink读写kafka数据聚集任务失败问题

2020-02-23 文章 chanamper
大家好,请教一下,flink任务读取kafka数据进行聚集操作后将结果写回kafka,flink版本为1.8.0。任务运行一段时间后出现如下异常,之后flink任务异常挂掉,请问一下这个问题该如何解决呢?多谢 2020-02-19 10:45:45,314 ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions java.io.IOException: Connection reset by peer

flink rocksdb状态后端物理内存溢出的问题

2020-02-20 文章 chanamper
请教一下,我采用flink 1.8版本,状态后端采用rocksdb方式,任务运行一段时间后containter会出现物理内存溢出,单个containter的内存为10G、堆内存使用很少仅1G左右。这种情况下我应该如何分析内存占用情况?

关于使用Flink RocksDBStateBackend问题

2020-01-14 文章 chanamper
Hi 在使用RocksDBStateBackend过程中,有些问题想请教一下 采用自定义的optionsFactory后,flink-conf.yaml配置文件中的RocksDB Configurable Options相关参数配置不生效,请问一下要如何使其中的state.backend.rocksdb.writebuffer.size等参数生效? OptionsFactory optionsFactory = new OptionsFactory() { @Override public DBOptions createDBOptions(DBOptions

Flink运行一段时间后出现异常重启

2019-12-15 文章 chanamper
请教一下,flink采用1.8.0版本,任务运行一段时间后出现AskTimeoutException异常,之后flink状态由Running转Failed,紧接着出现FlinkKafkaException:Failed to send data to kafka失败,任务定期出现这种异常重启。请教一下,这种通常有什么问题导致的呀,应该如何排除处理? 2019-12-15 19:25:14,954 WARN org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl - Requesting