Re: Re: group agg 开启了mini batch之后,state ttl不生效的问题
这个是是非窗口下agg的优化,目前窗口聚合agg还不支持mini batch > -原始邮件- > 发件人: "Tianwang Li" > 发送时间: 2020-09-30 14:32:39 (星期三) > 收件人: user-zh@flink.apache.org > 抄送: > 主题: Re: group agg 开启了mini batch之后,state ttl不生效的问题 > > 这种有窗口统计没有影响吧? > > > 刘建刚 于2020年9月30日周三 下午2:25写道: > > > 修复方案参考https://github.com/apache/flink/pull/11830 > > > > kandy.wang 于2020年9月30日周三 下午2:19写道: > > > > > group agg 开启了mini batch之后,state ttl不生效的问题: > > > > > > > > > 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink > > > 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 > > 十几万。 > > > > > > > > > sql-client-defaults.yaml对应的参数应该是这2个吧: > > > # minimum idle state retention in ms > > > min-idle-state-retention: 0 > > > # maximum idle state retention in ms > > > max-idle-state-retention: 0 > > > 这个现在进展如何了,这个社区打算什么时候支持 > > > > > > > > > > > > > > > > > -- > ** > tivanli > ** -- 刘大龙 浙江大学 控制系 智能系统与控制研究所 工控新楼217 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 Tel:18867547281
Re:flinksql注册udtf使用ROW类型做为输出输出时出错
上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])udtf的第三个参数, 好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪 在 2020-09-30 19:07:06,"chenxuying" 写道: >版本: >pyflink==1.0 >apache-flink==1.11.2 >代码如下: >env = StreamExecutionEnvironment.get_execution_environment() >env.set_parallelism(1) >t_env = StreamTableEnvironment.create(env) >t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", > 'true') > > >class SplitStr(TableFunction): >def eval(self, data): >for row in data: >yield row[0], row[1] >splitStr = udtf( >SplitStr(), >DataTypes.ARRAY( >DataTypes.ROW( >[ >DataTypes.FIELD("name", DataTypes.STRING()), >DataTypes.FIELD("id", DataTypes.STRING()) >] >) >), >DataTypes.ROW( >[ >DataTypes.FIELD("name", DataTypes.STRING()), >DataTypes.FIELD("id", DataTypes.STRING()) >] >) >) >t_env.register_function("splitStr", splitStr) > > >t_env.sql_update(""" >CREATE TABLE mySource ( > >id varchar, >data array> >) WITH ( >'connector' = 'kafka', >'topic' = 'mytesttopic', >'properties.bootstrap.servers' = '172.17.0.2:9092', >'properties.group.id' = 'flink-test-cxy', >'scan.startup.mode' = 'latest-offset', >'format' = 'json' >) >""") >t_env.sql_update(""" >CREATE TABLE mysqlsink ( >id varchar >,name varchar >,age varchar >) >with ( >'connector' = 'print' >) >""") >t_env.sql_update("insert into mysqlsink select id,name,age from mySource >,LATERAL TABLE(splitStr(data)) as T(name, age)") >t_env.execute("test") > > >最终报错 >TypeError: Invalid result_type: result_type should be DataType but contains >RowField(name, VARCHAR) >报错的地方是 >File >"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py", > line 264, in __init__ > > >def __init__(self, func, input_types, result_types, deterministic=None, >name=None): >super(UserDefinedTableFunctionWrapper, self).__init__( >func, input_types, deterministic, name) > > >if not isinstance(result_types, collections.Iterable): >result_types = [result_types] > > >for result_type in result_types: >if not isinstance(result_type, DataType): >raise TypeError( >"Invalid result_type: result_type should be DataType but contains {}".format( >result_type)) > > >self._result_types = result_types >self._judtf_placeholder = None > > >断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗 > > >另外的,假如我在 >上面在创建udtf的时候,如果这样写 >splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), >DataTypes.BIGINT()]) >却可以正常运行,但是显然类型跟我实际运行的不对应
flinksql注册udtf使用ROW类型做为输出输出时出错
版本: pyflink==1.0 apache-flink==1.11.2 代码如下: env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true') class SplitStr(TableFunction): def eval(self, data): for row in data: yield row[0], row[1] splitStr = udtf( SplitStr(), DataTypes.ARRAY( DataTypes.ROW( [ DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.STRING()) ] ) ), DataTypes.ROW( [ DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.STRING()) ] ) ) t_env.register_function("splitStr", splitStr) t_env.sql_update(""" CREATE TABLE mySource ( id varchar, data array> ) WITH ( 'connector' = 'kafka', 'topic' = 'mytesttopic', 'properties.bootstrap.servers' = '172.17.0.2:9092', 'properties.group.id' = 'flink-test-cxy', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """) t_env.sql_update(""" CREATE TABLE mysqlsink ( id varchar ,name varchar ,age varchar ) with ( 'connector' = 'print' ) """) t_env.sql_update("insert into mysqlsink select id,name,age from mySource ,LATERAL TABLE(splitStr(data)) as T(name, age)") t_env.execute("test") 最终报错 TypeError: Invalid result_type: result_type should be DataType but contains RowField(name, VARCHAR) 报错的地方是 File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py", line 264, in __init__ def __init__(self, func, input_types, result_types, deterministic=None, name=None): super(UserDefinedTableFunctionWrapper, self).__init__( func, input_types, deterministic, name) if not isinstance(result_types, collections.Iterable): result_types = [result_types] for result_type in result_types: if not isinstance(result_type, DataType): raise TypeError( "Invalid result_type: result_type should be DataType but contains {}".format( result_type)) self._result_types = result_types self._judtf_placeholder = None 断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗 另外的,假如我在 上面在创建udtf的时候,如果这样写 splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()]) 却可以正常运行,但是显然类型跟我实际运行的不对应
Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒
感谢回复,这个任务重启了之后看不到这个in/out指标数据, 我们能查到这个任务依赖的redis的连接查询次数也降低了,好像是任务假死一样 一直在消费数据但是就是不处理数据 没有和redis进行交互 tison 于2020年9月30日周三 下午5:34写道: > 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了... > > 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink > 有问题,比如可能依赖了外部环境或者内部积累错误等等。 > > Best, > tison. > > > Yang Peng 于2020年9月30日周三 下午5:26写道: > > > 感谢回复,是的,之前确实怀疑是业务逻辑导致的 > > 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题 > > > > tison 于2020年9月30日周三 下午2:33写道: > > > > > Hi Yang, > > > > > > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么? > > > > > > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。 > > > > > > Best, > > > tison. > > > > > > > > > Yang Peng 于2020年9月30日周三 上午10:29写道: > > > > > > > 感谢回复,我们看了consumer的lag很小 > > > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的 > > > > 而且任务重启了没法jstack判断了 > > > > > > > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道: > > > > > > > > > > > > > > > > > > > > > > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗 > > > > > 也可以 jstack 采下堆栈看下,GC等看下。 > > > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。 > > > > > Best, > > > > > Hailong Wang > > > > > 在 2020-09-29 20:06:50,"Yang Peng" 写道: > > > > > > > > > > > > > > > > > > > > >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90 > > > > > >flinkkafkaconsumer消费的并行度也是90 > > 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗? > > > > > > > > > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道: > > > > > > > > > > > >> > > > > > >> > > > > > >> > > > > > >> Hi Yang Peng: > > > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况: > > > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。 > > > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。 > > > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。 > > > > > >> Best, > > > > > >> Hailong Wang > > > > > >> > > > > > >> 在 2020-09-29 19:44:44,"Yang Peng" > 写道: > > > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1 > > > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题 > > > > > >> >kafka消费没有积压,也没有反压, > > > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了 > > > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、 > > > > > >> > > > > > > > > > > > > > > >
Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒
感谢回复,是的 就是从输出降低的时间点开始重新消费,是输出变少了 有些没有输出 运行任务的机器的网卡流量也变小了,监控系统是没有问题 其他任务的监控都正常 tison 于2020年9月30日周三 下午5:37写道: > > 故障点的意思是从开始跌的地方重新消费吗?如果是这样那就是有问题,可以看看之前输出变少是正确数据输出慢了还是有些没输出了,慢了就得看看当时的环境,应该还是会有什么网络或者负载有波动的,没有可能就要怀疑监控系统有问题了;少输出了就是错了,可能是依赖的外部环境不稳定等等。 > > Best, > tison. > > > tison 于2020年9月30日周三 下午5:33写道: > > > 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了... > > > > 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink > > 有问题,比如可能依赖了外部环境或者内部积累错误等等。 > > > > Best, > > tison. > > > > > > Yang Peng 于2020年9月30日周三 下午5:26写道: > > > >> 感谢回复,是的,之前确实怀疑是业务逻辑导致的 > >> 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题 > >> > >> tison 于2020年9月30日周三 下午2:33写道: > >> > >> > Hi Yang, > >> > > >> > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么? > >> > > >> > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。 > >> > > >> > Best, > >> > tison. > >> > > >> > > >> > Yang Peng 于2020年9月30日周三 上午10:29写道: > >> > > >> > > 感谢回复,我们看了consumer的lag很小 > >> > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的 > >> > > 而且任务重启了没法jstack判断了 > >> > > > >> > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道: > >> > > > >> > > > > >> > > > > >> > > > > >> > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗 > >> > > > 也可以 jstack 采下堆栈看下,GC等看下。 > >> > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。 > >> > > > Best, > >> > > > Hailong Wang > >> > > > 在 2020-09-29 20:06:50,"Yang Peng" 写道: > >> > > > > >> > > > > >> > > > >> > > >> > >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90 > >> > > > >flinkkafkaconsumer消费的并行度也是90 > >> 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗? > >> > > > > > >> > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道: > >> > > > > > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> Hi Yang Peng: > >> > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况: > >> > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。 > >> > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。 > >> > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。 > >> > > > >> Best, > >> > > > >> Hailong Wang > >> > > > >> > >> > > > >> 在 2020-09-29 19:44:44,"Yang Peng" > 写道: > >> > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 > 线上kafka集群为1.1.1 > >> > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题 > >> > > > >> >kafka消费没有积压,也没有反压, > >> > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了 > >> > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、 > >> > > > >> > >> > > > > >> > > > >> > > >> > > >
有docker jobmanager HA的 1.11.2 docker-entrypoint.sh
大家好, 有有docker jobmanager HA的 1.11.2 docker-entrypoint.sh脚本吗? 在官方的github里没有看到。 另外,有没有docker-compose.yml的配HA的例子吗? 谢谢。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒
故障点的意思是从开始跌的地方重新消费吗?如果是这样那就是有问题,可以看看之前输出变少是正确数据输出慢了还是有些没输出了,慢了就得看看当时的环境,应该还是会有什么网络或者负载有波动的,没有可能就要怀疑监控系统有问题了;少输出了就是错了,可能是依赖的外部环境不稳定等等。 Best, tison. tison 于2020年9月30日周三 下午5:33写道: > 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了... > > 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink > 有问题,比如可能依赖了外部环境或者内部积累错误等等。 > > Best, > tison. > > > Yang Peng 于2020年9月30日周三 下午5:26写道: > >> 感谢回复,是的,之前确实怀疑是业务逻辑导致的 >> 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题 >> >> tison 于2020年9月30日周三 下午2:33写道: >> >> > Hi Yang, >> > >> > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么? >> > >> > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。 >> > >> > Best, >> > tison. >> > >> > >> > Yang Peng 于2020年9月30日周三 上午10:29写道: >> > >> > > 感谢回复,我们看了consumer的lag很小 >> > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的 >> > > 而且任务重启了没法jstack判断了 >> > > >> > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道: >> > > >> > > > >> > > > >> > > > >> > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗 >> > > > 也可以 jstack 采下堆栈看下,GC等看下。 >> > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。 >> > > > Best, >> > > > Hailong Wang >> > > > 在 2020-09-29 20:06:50,"Yang Peng" 写道: >> > > > >> > > > >> > > >> > >> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90 >> > > > >flinkkafkaconsumer消费的并行度也是90 >> 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗? >> > > > > >> > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道: >> > > > > >> > > > >> >> > > > >> >> > > > >> >> > > > >> Hi Yang Peng: >> > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况: >> > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。 >> > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。 >> > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。 >> > > > >> Best, >> > > > >> Hailong Wang >> > > > >> >> > > > >> 在 2020-09-29 19:44:44,"Yang Peng" 写道: >> > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1 >> > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题 >> > > > >> >kafka消费没有积压,也没有反压, >> > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了 >> > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、 >> > > > >> >> > > > >> > > >> > >> >
Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒
那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了... 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink 有问题,比如可能依赖了外部环境或者内部积累错误等等。 Best, tison. Yang Peng 于2020年9月30日周三 下午5:26写道: > 感谢回复,是的,之前确实怀疑是业务逻辑导致的 > 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题 > > tison 于2020年9月30日周三 下午2:33写道: > > > Hi Yang, > > > > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么? > > > > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。 > > > > Best, > > tison. > > > > > > Yang Peng 于2020年9月30日周三 上午10:29写道: > > > > > 感谢回复,我们看了consumer的lag很小 > > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的 > > > 而且任务重启了没法jstack判断了 > > > > > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道: > > > > > > > > > > > > > > > > > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗 > > > > 也可以 jstack 采下堆栈看下,GC等看下。 > > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。 > > > > Best, > > > > Hailong Wang > > > > 在 2020-09-29 20:06:50,"Yang Peng" 写道: > > > > > > > > > > > > > > >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90 > > > > >flinkkafkaconsumer消费的并行度也是90 > 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗? > > > > > > > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道: > > > > > > > > > >> > > > > >> > > > > >> > > > > >> Hi Yang Peng: > > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况: > > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。 > > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。 > > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。 > > > > >> Best, > > > > >> Hailong Wang > > > > >> > > > > >> 在 2020-09-29 19:44:44,"Yang Peng" 写道: > > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1 > > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题 > > > > >> >kafka消费没有积压,也没有反压, > > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了 > > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、 > > > > >> > > > > > > > > > >
Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒
感谢回复,是的,之前确实怀疑是业务逻辑导致的 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题 tison 于2020年9月30日周三 下午2:33写道: > Hi Yang, > > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么? > > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。 > > Best, > tison. > > > Yang Peng 于2020年9月30日周三 上午10:29写道: > > > 感谢回复,我们看了consumer的lag很小 > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的 > > 而且任务重启了没法jstack判断了 > > > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道: > > > > > > > > > > > > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗 > > > 也可以 jstack 采下堆栈看下,GC等看下。 > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。 > > > Best, > > > Hailong Wang > > > 在 2020-09-29 20:06:50,"Yang Peng" 写道: > > > > > > > > > >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90 > > > >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗? > > > > > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道: > > > > > > > >> > > > >> > > > >> > > > >> Hi Yang Peng: > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况: > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。 > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。 > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。 > > > >> Best, > > > >> Hailong Wang > > > >> > > > >> 在 2020-09-29 19:44:44,"Yang Peng" 写道: > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1 > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题 > > > >> >kafka消费没有积压,也没有反压, > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了 > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、 > > > >> > > > > > >
回复:flink任务yarn perjob 提交任务如何设置job name
-m yarn-cluster -ynm name 这个可以设置 yarn-application 的名字 -- 发件人:丁浩浩 <18579099...@163.com> 发送时间:2020年9月30日(星期三) 15:44 收件人:user-zh@flink.apache.org 主 题:flink任务yarn perjob 提交任务如何设置job name 如题,我需要设置flink提交到yarn的job name应该怎么设置呢?
Re: flink任务yarn perjob 提交任务如何设置job name
代码里 env.execute("你的作业名") Best, tison. 丁浩浩 <18579099...@163.com> 于2020年9月30日周三 下午3:44写道: > 如题,我需要设置flink提交到yarn的job name应该怎么设置呢?
flink任务yarn perjob 提交任务如何设置job name
如题,我需要设置flink提交到yarn的job name应该怎么设置呢?
回复:Flink on K8s statebackend 配置
补充一下,我的错误日志 Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please seehttps://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. 应该是没有Hadoop的路径,这个在K8s下面 该怎么去配置呢 | | superainbower | | superainbo...@163.com | 签名由网易邮箱大师定制 在2020年09月30日 14:33,superainbower 写道: Hi,all 请教下,哪个朋友知道Flink on K8s上做 statebackend 配置,除了将下列配置写到flink-conf.yml里,还需要作哪些工作? state.backend: rocksdb state.checkpoints.dir: hdfs://master:8020/flink/checkpoints state.savepoints.dir: hdfs://master:8020/flink/savepoints state.backend.incremental: true | | superainbower | | superainbo...@163.com | 签名由网易邮箱大师定制
Flink on K8s statebackend 配置
Hi,all 请教下,哪个朋友知道Flink on K8s上做 statebackend 配置,除了将下列配置写到flink-conf.yml里,还需要作哪些工作? state.backend: rocksdb state.checkpoints.dir: hdfs://master:8020/flink/checkpoints state.savepoints.dir: hdfs://master:8020/flink/savepoints state.backend.incremental: true | | superainbower | | superainbo...@163.com | 签名由网易邮箱大师定制
Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒
Hi Yang, 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么? 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。 Best, tison. Yang Peng 于2020年9月30日周三 上午10:29写道: > 感谢回复,我们看了consumer的lag很小 > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的 > 而且任务重启了没法jstack判断了 > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道: > > > > > > > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗 > > 也可以 jstack 采下堆栈看下,GC等看下。 > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。 > > Best, > > Hailong Wang > > 在 2020-09-29 20:06:50,"Yang Peng" 写道: > > > > > >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90 > > >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗? > > > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道: > > > > > >> > > >> > > >> > > >> Hi Yang Peng: > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况: > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。 > > >> 2. Source 的序列化耗时严重,导致拉取变慢。 > > >> 可以尝试着扩kafka 分区,加大Source并发看下。 > > >> Best, > > >> Hailong Wang > > >> > > >> 在 2020-09-29 19:44:44,"Yang Peng" 写道: > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1 > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题 > > >> >kafka消费没有积压,也没有反压, > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了 > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、 > > >> > > >
Re: group agg 开启了mini batch之后,state ttl不生效的问题
这种有窗口统计没有影响吧? 刘建刚 于2020年9月30日周三 下午2:25写道: > 修复方案参考https://github.com/apache/flink/pull/11830 > > kandy.wang 于2020年9月30日周三 下午2:19写道: > > > group agg 开启了mini batch之后,state ttl不生效的问题: > > > > > > 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink > > 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 > 十几万。 > > > > > > sql-client-defaults.yaml对应的参数应该是这2个吧: > > # minimum idle state retention in ms > > min-idle-state-retention: 0 > > # maximum idle state retention in ms > > max-idle-state-retention: 0 > > 这个现在进展如何了,这个社区打算什么时候支持 > > > > > > > > > -- ** tivanli **
Re: group agg 开启了mini batch之后,state ttl不生效的问题
修复方案参考https://github.com/apache/flink/pull/11830 kandy.wang 于2020年9月30日周三 下午2:19写道: > group agg 开启了mini batch之后,state ttl不生效的问题: > > > 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink > 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 十几万。 > > > sql-client-defaults.yaml对应的参数应该是这2个吧: > # minimum idle state retention in ms > min-idle-state-retention: 0 > # maximum idle state retention in ms > max-idle-state-retention: 0 > 这个现在进展如何了,这个社区打算什么时候支持 > > > >
group agg 开启了mini batch之后,state ttl不生效的问题
group agg 开启了mini batch之后,state ttl不生效的问题: 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 十几万。 sql-client-defaults.yaml对应的参数应该是这2个吧: # minimum idle state retention in ms min-idle-state-retention: 0 # maximum idle state retention in ms max-idle-state-retention: 0 这个现在进展如何了,这个社区打算什么时候支持